/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.extensions.barrage.util;

import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.google.common.io.LittleEndianDataInputStream;
import com.google.flatbuffers.Table;
import com.google.protobuf.CodedInputStream;
import io.deephaven.barrage.flatbuf.BarrageMessageWrapper;
import io.deephaven.barrage.flatbuf.BarrageModColumnMetadata;
import io.deephaven.barrage.flatbuf.BarrageUpdateMetadata;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.impl.ExternalizableRowSetUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.FlatBufferIteratorAdapter;
import io.deephaven.extensions.barrage.util.GrpcMarshallingException;
import io.deephaven.extensions.barrage.util.StreamReader;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.PrimitiveIterator;
import java.util.function.LongConsumer;
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.RecordBatch;

public class BarrageStreamReader
implements StreamReader {
    private static final Logger log = LoggerFactory.getLogger(BarrageStreamReader.class);
    private static final int MAX_CHUNK_SIZE = 0x7FFFFFF7;
    private final LongConsumer deserializeTmConsumer;
    private long numAddRowsRead = 0L;
    private long numAddRowsTotal = 0L;
    private long numModRowsRead = 0L;
    private long numModRowsTotal = 0L;
    private BarrageMessage msg = null;

    public BarrageStreamReader(LongConsumer deserializeTmConsumer) {
        this.deserializeTmConsumer = deserializeTmConsumer;
    }

    @Override
    public BarrageMessage safelyParseFrom(StreamReaderOptions options, BitSet expectedColumns, ChunkType[] columnChunkTypes, Class<?>[] columnTypes, Class<?>[] componentTypes, InputStream stream) {
        long startDeserTm = System.nanoTime();
        Message header = null;
        try {
            boolean bodyParsed = false;
            CodedInputStream decoder = CodedInputStream.newInstance((InputStream)stream);
            int tag = decoder.readTag();
            while (tag != 0) {
                int size;
                if (tag == 18) {
                    size = decoder.readRawVarint32();
                    header = Message.getRootAsMessage((ByteBuffer)ByteBuffer.wrap(decoder.readRawBytes(size)));
                } else if (tag == 26) {
                    size = decoder.readRawVarint32();
                    ByteBuffer msgAsBB = ByteBuffer.wrap(decoder.readRawBytes(size));
                    BarrageMessageWrapper wrapper = BarrageMessageWrapper.getRootAsBarrageMessageWrapper((ByteBuffer)msgAsBB);
                    if (wrapper.magic() != 1852338276L) {
                        log.warn().append((CharSequence)"BarrageStreamReader: skipping app_metadata that does not look like BarrageMessageWrapper").endl();
                    } else if (wrapper.msgType() == 6) {
                        int ci;
                        if (this.msg != null) {
                            throw new IllegalStateException("Previous message was not complete; pending " + (this.numAddRowsTotal - this.numAddRowsRead) + " add rows and " + (this.numModRowsTotal - this.numModRowsRead) + " mod rows");
                        }
                        BarrageUpdateMetadata metadata = BarrageUpdateMetadata.getRootAsBarrageUpdateMetadata((ByteBuffer)wrapper.msgPayloadAsByteBuffer());
                        this.msg = new BarrageMessage();
                        this.msg.isSnapshot = metadata.isSnapshot();
                        this.msg.snapshotRowSetIsReversed = metadata.effectiveReverseViewport();
                        this.numAddRowsRead = 0L;
                        this.numModRowsRead = 0L;
                        if (this.msg.isSnapshot) {
                            ByteBuffer effectiveSnapshotColumns;
                            ByteBuffer effectiveViewport = metadata.effectiveViewportAsByteBuffer();
                            if (effectiveViewport != null) {
                                this.msg.snapshotRowSet = BarrageStreamReader.extractIndex(effectiveViewport);
                            }
                            if ((effectiveSnapshotColumns = metadata.effectiveColumnSetAsByteBuffer()) != null) {
                                this.msg.snapshotColumns = BarrageStreamReader.extractBitSet(effectiveSnapshotColumns);
                            }
                        }
                        this.msg.firstSeq = metadata.firstSeq();
                        this.msg.lastSeq = metadata.lastSeq();
                        this.msg.rowsAdded = BarrageStreamReader.extractIndex(metadata.addedRowsAsByteBuffer());
                        this.msg.rowsRemoved = BarrageStreamReader.extractIndex(metadata.removedRowsAsByteBuffer());
                        this.msg.shifted = BarrageStreamReader.extractIndexShiftData(metadata.shiftDataAsByteBuffer());
                        ByteBuffer rowsIncluded = metadata.addedRowsIncludedAsByteBuffer();
                        this.msg.rowsIncluded = rowsIncluded != null ? BarrageStreamReader.extractIndex(rowsIncluded) : this.msg.rowsAdded.copy();
                        this.msg.addColumnData = new BarrageMessage.AddColumnData[columnTypes.length];
                        for (ci = 0; ci < this.msg.addColumnData.length; ++ci) {
                            this.msg.addColumnData[ci] = new BarrageMessage.AddColumnData();
                            this.msg.addColumnData[ci].type = columnTypes[ci];
                            this.msg.addColumnData[ci].componentType = componentTypes[ci];
                            this.msg.addColumnData[ci].data = new ArrayList();
                            int chunkSize = (int)Math.min(this.msg.rowsIncluded.size(), 0x7FFFFFF7L);
                            WritableChunk chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
                            chunk.setSize(0);
                            this.msg.addColumnData[ci].data.add(chunk);
                        }
                        this.numAddRowsTotal = this.msg.rowsIncluded.size();
                        this.numModRowsTotal = 0L;
                        this.msg.modColumnData = new BarrageMessage.ModColumnData[metadata.modColumnNodesLength()];
                        for (ci = 0; ci < this.msg.modColumnData.length; ++ci) {
                            this.msg.modColumnData[ci] = new BarrageMessage.ModColumnData();
                            this.msg.modColumnData[ci].type = columnTypes[ci];
                            this.msg.modColumnData[ci].componentType = componentTypes[ci];
                            this.msg.modColumnData[ci].data = new ArrayList();
                            BarrageModColumnMetadata mcd = metadata.modColumnNodes(ci);
                            this.msg.modColumnData[ci].rowsModified = BarrageStreamReader.extractIndex(mcd.modifiedRowsAsByteBuffer());
                            int chunkSize = (int)Math.min(this.msg.modColumnData[ci].rowsModified.size(), 0x7FFFFFF7L);
                            WritableChunk chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
                            chunk.setSize(0);
                            this.msg.modColumnData[ci].data.add(chunk);
                            this.numModRowsTotal = Math.max(this.numModRowsTotal, this.msg.modColumnData[ci].rowsModified.size());
                        }
                    }
                } else if (tag != 8002) {
                    decoder.skipField(tag);
                } else {
                    if (bodyParsed) {
                        throw new IllegalStateException("Unexpected duplicate body tag");
                    }
                    if (header == null) {
                        throw new IllegalStateException("Missing metadata header; cannot decode body");
                    }
                    if (header.headerType() != 3) {
                        throw new IllegalStateException("Only know how to decode Schema/BarrageRecordBatch messages");
                    }
                    if (this.msg == null) {
                        throw new IllegalStateException("Missing app metadata tag; cannot decode using BarrageStreamReader");
                    }
                    bodyParsed = true;
                    size = decoder.readRawVarint32();
                    RecordBatch batch = (RecordBatch)header.header((Table)new RecordBatch());
                    this.msg.length = batch.length();
                    try (LittleEndianDataInputStream ois = new LittleEndianDataInputStream((InputStream)new BarrageProtoUtil.ObjectInputStreamAdapter(decoder, size));){
                        WritableChunk chunk;
                        int lastChunkIndex;
                        int ci;
                        FlatBufferIteratorAdapter<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter = new FlatBufferIteratorAdapter<ChunkInputStreamGenerator.FieldNodeInfo>(batch.nodesLength(), i -> new ChunkInputStreamGenerator.FieldNodeInfo(batch.nodes(i)));
                        long[] bufferInfo = new long[batch.buffersLength()];
                        for (int bi = 0; bi < batch.buffersLength(); ++bi) {
                            int offset = LongSizedDataStructure.intSize((String)"BufferInfo", (long)batch.buffers(bi).offset());
                            int length = LongSizedDataStructure.intSize((String)"BufferInfo", (long)batch.buffers(bi).length());
                            if (bi < batch.buffersLength() - 1) {
                                int nextOffset = LongSizedDataStructure.intSize((String)"BufferInfo", (long)batch.buffers(bi + 1).offset());
                                length += Math.max(0, nextOffset - offset - length);
                            }
                            bufferInfo[bi] = length;
                        }
                        PrimitiveIterator.OfLong bufferInfoIter = Arrays.stream(bufferInfo).iterator();
                        if (this.numAddRowsRead < this.numAddRowsTotal) {
                            for (ci = 0; ci < this.msg.addColumnData.length; ++ci) {
                                BarrageMessage.AddColumnData acd = this.msg.addColumnData[ci];
                                long remaining = this.numAddRowsTotal - this.numAddRowsRead;
                                if (batch.length() > remaining) {
                                    throw new IllegalStateException("Batch length exceeded the expected number of rows from app metadata");
                                }
                                lastChunkIndex = acd.data.size() - 1;
                                chunk = (WritableChunk)acd.data.get(lastChunkIndex);
                                if (batch.length() > (long)(chunk.capacity() - chunk.size())) {
                                    int chunkSize = (int)Math.min(remaining, 0x7FFFFFF7L);
                                    chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
                                    acd.data.add(chunk);
                                    chunk.setSize(0);
                                    ++lastChunkIndex;
                                }
                                acd.data.set(lastChunkIndex, ChunkInputStreamGenerator.extractChunkFromInputStream(options, columnChunkTypes[ci], columnTypes[ci], componentTypes[ci], fieldNodeIter, bufferInfoIter, (DataInput)ois, (WritableChunk<Values>)chunk, chunk.size(), (int)batch.length()));
                                chunk.setSize(chunk.size() + (int)batch.length());
                            }
                            this.numAddRowsRead += batch.length();
                        }
                        for (ci = 0; ci < this.msg.modColumnData.length; ++ci) {
                            BarrageMessage.ModColumnData mcd = this.msg.modColumnData[ci];
                            long remaining = Math.max(0L, mcd.rowsModified.size() - this.numModRowsRead);
                            lastChunkIndex = mcd.data.size() - 1;
                            chunk = (WritableChunk)mcd.data.get(lastChunkIndex);
                            int numRowsToRead = LongSizedDataStructure.intSize((String)"BarrageStreamReader", (long)Math.min(remaining, batch.length()));
                            if (numRowsToRead > chunk.capacity() - chunk.size()) {
                                int chunkSize = (int)Math.min(remaining, 0x7FFFFFF7L);
                                chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
                                mcd.data.add(chunk);
                                chunk.setSize(0);
                                ++lastChunkIndex;
                            }
                            mcd.data.set(lastChunkIndex, ChunkInputStreamGenerator.extractChunkFromInputStream(options, columnChunkTypes[ci], columnTypes[ci], componentTypes[ci], fieldNodeIter, bufferInfoIter, (DataInput)ois, (WritableChunk<Values>)chunk, chunk.size(), numRowsToRead));
                            chunk.setSize(chunk.size() + numRowsToRead);
                        }
                        this.numModRowsRead += batch.length();
                    }
                }
                tag = decoder.readTag();
            }
            if (header != null && header.headerType() == 1) {
                return null;
            }
            if (!bodyParsed) {
                throw new IllegalStateException("Missing body tag");
            }
            this.deserializeTmConsumer.accept(System.nanoTime() - startDeserTm);
            if (this.numAddRowsRead == this.numAddRowsTotal && this.numModRowsRead == this.numModRowsTotal) {
                BarrageMessage retval = this.msg;
                this.msg = null;
                return retval;
            }
            return null;
        }
        catch (Exception e) {
            log.error().append((CharSequence)"Unable to parse a received BarrageMessage: ").append((Throwable)e).endl();
            throw new GrpcMarshallingException("Unable to parse BarrageMessage object", e);
        }
    }

    private static RowSet extractIndex(ByteBuffer bb) throws IOException {
        if (bb == null) {
            return RowSetFactory.empty();
        }
        try (LittleEndianDataInputStream is = new LittleEndianDataInputStream((InputStream)new ByteBufferBackedInputStream(bb));){
            RowSet rowSet = ExternalizableRowSetUtils.readExternalCompressedDelta((DataInput)is);
            return rowSet;
        }
    }

    private static BitSet extractBitSet(ByteBuffer bb) {
        return BitSet.valueOf(bb);
    }

    private static RowSetShiftData extractIndexShiftData(ByteBuffer bb) throws IOException {
        RowSet dRowSet;
        RowSet eRowSet;
        RowSet sRowSet;
        RowSetShiftData.Builder builder = new RowSetShiftData.Builder();
        try (LittleEndianDataInputStream is = new LittleEndianDataInputStream((InputStream)new ByteBufferBackedInputStream(bb));){
            sRowSet = ExternalizableRowSetUtils.readExternalCompressedDelta((DataInput)is);
            eRowSet = ExternalizableRowSetUtils.readExternalCompressedDelta((DataInput)is);
            dRowSet = ExternalizableRowSetUtils.readExternalCompressedDelta((DataInput)is);
        }
        try (RowSet.Iterator sit = sRowSet.iterator();
             RowSet.Iterator eit = eRowSet.iterator();
             RowSet.Iterator dit = dRowSet.iterator();){
            while (sit.hasNext()) {
                if (!eit.hasNext() || !dit.hasNext()) {
                    throw new IllegalStateException("RowSetShiftData is inconsistent");
                }
                long next = sit.nextLong();
                builder.shiftRange(next, eit.nextLong(), dit.nextLong() - next);
            }
        }
        return builder.build();
    }
}

