/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.extensions.barrage.util;

import com.google.common.io.LittleEndianDataInputStream;
import com.google.flatbuffers.Table;
import com.google.protobuf.CodedInputStream;
import com.google.rpc.Code;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.table.BarrageTable;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.FlatBufferIteratorAdapter;
import io.deephaven.io.streams.ByteBufferInputStream;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.PrimitiveIterator;
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.flatbuf.Schema;

public class ArrowToTableConverter {
    protected long totalRowsRead = 0L;
    protected BarrageTable resultTable;
    private ChunkType[] columnChunkTypes;
    private int[] columnConversionFactors;
    private Class<?>[] columnTypes;
    private Class<?>[] componentTypes;
    protected BarrageSubscriptionOptions options = BarrageProtoUtil.DEFAULT_SER_OPTIONS;
    private volatile boolean completed = false;

    private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(ByteBuffer bb) throws IOException {
        BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo();
        bb.order(ByteOrder.LITTLE_ENDIAN);
        int continuation = bb.getInt();
        int metadata_size = bb.getInt();
        mi.header = Message.getRootAsMessage((ByteBuffer)bb);
        if (mi.header.headerType() == 3) {
            bb.position(metadata_size + 8);
            ByteBuffer bodyBB = bb.slice();
            ByteBufferInputStream bbis = new ByteBufferInputStream(bodyBB);
            CodedInputStream decoder = CodedInputStream.newInstance((InputStream)bbis);
            mi.inputStream = new LittleEndianDataInputStream((InputStream)new BarrageProtoUtil.ObjectInputStreamAdapter(decoder, bodyBB.remaining()));
        }
        return mi;
    }

    @ScriptApi
    public synchronized void setSchema(ByteBuffer ipcMessage) {
        if (this.completed) {
            throw new IllegalStateException("Conversion is complete; cannot process additional messages");
        }
        BarrageProtoUtil.MessageInfo mi = this.getMessageInfo(ipcMessage);
        if (mi.header.headerType() != 1) {
            throw new IllegalArgumentException("The input is not a valid Arrow Schema IPC message");
        }
        this.parseSchema((Schema)mi.header.header((Table)new Schema()));
    }

    @ScriptApi
    public synchronized void addRecordBatches(ByteBuffer ... ipcMessages) {
        for (ByteBuffer ipcMessage : ipcMessages) {
            this.addRecordBatch(ipcMessage);
        }
    }

    @ScriptApi
    public synchronized void addRecordBatch(ByteBuffer ipcMessage) {
        if (this.completed) {
            throw new IllegalStateException("Conversion is complete; cannot process additional messages");
        }
        if (this.resultTable == null) {
            throw new IllegalStateException("Arrow schema must be provided before record batches can be added");
        }
        BarrageProtoUtil.MessageInfo mi = this.getMessageInfo(ipcMessage);
        if (mi.header.headerType() != 3) {
            throw new IllegalArgumentException("The input is not a valid Arrow RecordBatch IPC message");
        }
        int numColumns = this.resultTable.getColumnSources().size();
        BarrageMessage msg = this.createBarrageMessage(mi, numColumns);
        msg.rowsAdded = RowSetFactory.fromRange((long)this.totalRowsRead, (long)(this.totalRowsRead + msg.length - 1L));
        msg.rowsIncluded = msg.rowsAdded.copy();
        msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS;
        this.totalRowsRead += msg.length;
        this.resultTable.handleBarrageMessage(msg);
    }

    @ScriptApi
    public synchronized BarrageTable getResultTable() {
        if (!this.completed) {
            throw new IllegalStateException("Conversion must be completed prior to requesting the result");
        }
        return this.resultTable;
    }

    @ScriptApi
    public synchronized void onCompleted() throws InterruptedException {
        if (this.completed) {
            throw new IllegalStateException("Conversion cannot be completed twice");
        }
        this.completed = true;
    }

    protected void parseSchema(Schema header) {
        if (this.resultTable != null) {
            throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)"Schema evolution not supported");
        }
        BarrageUtil.ConvertedArrowSchema result = BarrageUtil.convertArrowSchema(header);
        this.resultTable = BarrageTable.make(null, result.tableDef, result.attributes, null);
        this.resultTable.setFlat();
        this.columnConversionFactors = result.conversionFactors;
        this.columnChunkTypes = result.computeWireChunkTypes();
        this.columnTypes = result.computeWireTypes();
        this.componentTypes = result.computeWireComponentTypes();
        this.resultTable.retainReference();
    }

    protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, int numColumns) {
        BarrageMessage msg = new BarrageMessage();
        RecordBatch batch = (RecordBatch)mi.header.header((Table)new RecordBatch());
        FlatBufferIteratorAdapter<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter = new FlatBufferIteratorAdapter<ChunkInputStreamGenerator.FieldNodeInfo>(batch.nodesLength(), i -> new ChunkInputStreamGenerator.FieldNodeInfo(batch.nodes(i)));
        long[] bufferInfo = new long[batch.buffersLength()];
        for (int bi = 0; bi < batch.buffersLength(); ++bi) {
            int offset = LongSizedDataStructure.intSize((String)"BufferInfo", (long)batch.buffers(bi).offset());
            int length = LongSizedDataStructure.intSize((String)"BufferInfo", (long)batch.buffers(bi).length());
            if (bi < batch.buffersLength() - 1) {
                int nextOffset = LongSizedDataStructure.intSize((String)"BufferInfo", (long)batch.buffers(bi + 1).offset());
                length += Math.max(0, nextOffset - offset - length);
            }
            bufferInfo[bi] = length;
        }
        PrimitiveIterator.OfLong bufferInfoIter = Arrays.stream(bufferInfo).iterator();
        msg.rowsRemoved = RowSetFactory.empty();
        msg.shifted = RowSetShiftData.EMPTY;
        int numRowsAdded = LongSizedDataStructure.intSize((String)"RecordBatch.length()", (long)batch.length());
        msg.addColumnData = new BarrageMessage.AddColumnData[numColumns];
        for (int ci = 0; ci < numColumns; ++ci) {
            BarrageMessage.AddColumnData acd;
            msg.addColumnData[ci] = acd = new BarrageMessage.AddColumnData();
            msg.addColumnData[ci].data = new ArrayList();
            int factor = this.columnConversionFactors == null ? 1 : this.columnConversionFactors[ci];
            try {
                acd.data.add(ChunkInputStreamGenerator.extractChunkFromInputStream(this.options, factor, this.columnChunkTypes[ci], this.columnTypes[ci], this.componentTypes[ci], fieldNodeIter, bufferInfoIter, (DataInput)mi.inputStream, null, 0, 0));
            }
            catch (IOException unexpected) {
                throw new UncheckedDeephavenException((Throwable)unexpected);
            }
            if (((Chunk)acd.data.get(0)).size() != numRowsAdded) {
                throw Exceptions.statusRuntimeException((Code)Code.INVALID_ARGUMENT, (String)("Inconsistent num records per column: " + numRowsAdded + " != " + ((Chunk)acd.data.get(0)).size()));
            }
            acd.type = this.columnTypes[ci];
            acd.componentType = this.componentTypes[ci];
        }
        msg.length = numRowsAdded;
        return msg;
    }

    private BarrageProtoUtil.MessageInfo getMessageInfo(ByteBuffer ipcMessage) {
        BarrageProtoUtil.MessageInfo mi;
        try {
            mi = ArrowToTableConverter.parseArrowIpcMessage(ipcMessage);
        }
        catch (IOException unexpected) {
            throw new UncheckedDeephavenException((Throwable)unexpected);
        }
        return mi;
    }
}

