/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.extensions.barrage.util;

import com.google.common.io.LittleEndianDataInputStream;
import com.google.flatbuffers.Table;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.Message;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.util.BarrageMarshallingException;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.FlatBufferIteratorAdapter;
import io.deephaven.extensions.barrage.util.GrpcMarshallingException;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
import java.io.DataInput;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.PrimitiveIterator;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;

public class BarrageChunkAppendingMarshaller
implements MethodDescriptor.Marshaller<Integer> {
    private static final BarrageSnapshotOptions BARRAGE_OPTIONS = BarrageSnapshotOptions.builder().build();
    private static final Logger log = LoggerFactory.getLogger(BarrageChunkAppendingMarshaller.class);
    private final BarrageSnapshotOptions options;
    private final ChunkType[] columnChunkTypes;
    private final Class<?>[] columnTypes;
    private final Class<?>[] componentTypes;
    private final WritableChunk<Values>[] destChunks;
    private long numRowsRead = 0L;

    public static MethodDescriptor<Flight.Ticket, Integer> getClientDoGetDescriptor(ChunkType[] columnChunkTypes, Class<?>[] columnTypes, Class<?>[] componentTypes, WritableChunk<Values>[] destChunks) {
        MethodDescriptor.Marshaller requestMarshaller = ProtoUtils.marshaller((Message)Flight.Ticket.getDefaultInstance());
        MethodDescriptor descriptor = FlightServiceGrpc.getDoGetMethod();
        return MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.SERVER_STREAMING).setFullMethodName(descriptor.getFullMethodName()).setSampledToLocalTracing(false).setRequestMarshaller(requestMarshaller).setResponseMarshaller((MethodDescriptor.Marshaller)new BarrageChunkAppendingMarshaller(BARRAGE_OPTIONS, columnChunkTypes, columnTypes, componentTypes, destChunks)).setSchemaDescriptor(descriptor.getSchemaDescriptor()).build();
    }

    public BarrageChunkAppendingMarshaller(BarrageSnapshotOptions options, ChunkType[] columnChunkTypes, Class<?>[] columnTypes, Class<?>[] componentTypes, WritableChunk<Values>[] destChunks) {
        this.options = options;
        this.columnChunkTypes = columnChunkTypes;
        this.columnTypes = columnTypes;
        this.componentTypes = componentTypes;
        this.destChunks = destChunks;
    }

    public InputStream stream(Integer value) {
        throw new UnsupportedOperationException("BarrageDataMarshaller unexpectedly used to directly convert BarrageMessage to InputStream");
    }

    public Integer parse(InputStream stream) {
        org.apache.arrow.flatbuf.Message header = null;
        try {
            boolean bodyParsed = false;
            CodedInputStream decoder = CodedInputStream.newInstance((InputStream)stream);
            int tag = decoder.readTag();
            while (tag != 0) {
                int size;
                if (tag == 18) {
                    size = decoder.readRawVarint32();
                    header = org.apache.arrow.flatbuf.Message.getRootAsMessage((ByteBuffer)ByteBuffer.wrap(decoder.readRawBytes(size)));
                } else if (tag != 8002) {
                    decoder.skipField(tag);
                } else {
                    if (bodyParsed) {
                        throw new IllegalStateException("Unexpected duplicate body tag");
                    }
                    if (header == null) {
                        throw new IllegalStateException("Missing metadata header; cannot decode body");
                    }
                    if (header.headerType() != 3) {
                        throw new IllegalStateException("Only know how to decode Schema/BarrageRecordBatch messages");
                    }
                    bodyParsed = true;
                    size = decoder.readRawVarint32();
                    RecordBatch batch = (RecordBatch)header.header((Table)new RecordBatch());
                    try (LittleEndianDataInputStream ois = new LittleEndianDataInputStream((InputStream)new BarrageProtoUtil.ObjectInputStreamAdapter(decoder, size));){
                        FlatBufferIteratorAdapter<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter = new FlatBufferIteratorAdapter<ChunkInputStreamGenerator.FieldNodeInfo>(batch.nodesLength(), i -> new ChunkInputStreamGenerator.FieldNodeInfo(batch.nodes(i)));
                        long[] bufferInfo = new long[batch.buffersLength()];
                        for (int bi = 0; bi < batch.buffersLength(); ++bi) {
                            int offset = LongSizedDataStructure.intSize((String)"BufferInfo", (long)batch.buffers(bi).offset());
                            int length = LongSizedDataStructure.intSize((String)"BufferInfo", (long)batch.buffers(bi).length());
                            if (bi < batch.buffersLength() - 1) {
                                int nextOffset = LongSizedDataStructure.intSize((String)"BufferInfo", (long)batch.buffers(bi + 1).offset());
                                length += Math.max(0, nextOffset - offset - length);
                            }
                            bufferInfo[bi] = length;
                        }
                        PrimitiveIterator.OfLong bufferInfoIter = Arrays.stream(bufferInfo).iterator();
                        for (int ci = 0; ci < this.destChunks.length; ++ci) {
                            WritableChunk<Values> dest = this.destChunks[ci];
                            long remaining = dest.capacity() - dest.size();
                            if (batch.length() > remaining) {
                                throw new BarrageMarshallingException(String.format("Received RecordBatch length (%d) exceeds the remaining capacity (%d) of the destination Chunk.", batch.length(), remaining));
                            }
                            WritableChunk<Values> retChunk = ChunkInputStreamGenerator.extractChunkFromInputStream(this.options, this.columnChunkTypes[ci], this.columnTypes[ci], this.componentTypes[ci], fieldNodeIter, bufferInfoIter, (DataInput)ois, dest, dest.size(), (int)batch.length());
                            if (retChunk != dest) {
                                throw new BarrageMarshallingException("Unexpected chunk returned from ChunkInputStreamGenerator.extractChunkFromInputStream");
                            }
                            dest.setSize(dest.size() + (int)batch.length());
                        }
                        this.numRowsRead += batch.length();
                    }
                }
                tag = decoder.readTag();
            }
            if (header != null && header.headerType() == 1) {
                return 0;
            }
            if (!bodyParsed) {
                throw new IllegalStateException("Missing body tag");
            }
            return (int)this.numRowsRead;
        }
        catch (Exception e) {
            log.error().append((CharSequence)"Unable to parse a received DoGet: ").append((Throwable)e).endl();
            if (e instanceof BarrageMarshallingException) {
                throw (BarrageMarshallingException)((Object)e);
            }
            throw new GrpcMarshallingException("Unable to parse DoGet", e);
        }
    }
}

