/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.streaming.internal.ChannelData;
import net.snowflake.ingest.streaming.internal.ChannelMetadata;
import net.snowflake.ingest.streaming.internal.Flusher;
import net.snowflake.ingest.streaming.internal.RowBufferStats;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;

public class ArrowFlusher
implements Flusher<VectorSchemaRoot> {
    private static final Logging logger = new Logging(ArrowFlusher.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Flusher.SerializationResult serialize(List<ChannelData<VectorSchemaRoot>> channelsDataPerTable, String filePath) throws IOException {
        ByteArrayOutputStream chunkData = new ByteArrayOutputStream();
        ArrayList<ChannelMetadata> channelsMetadataList = new ArrayList<ChannelMetadata>();
        long rowCount = 0L;
        VectorSchemaRoot root = null;
        ArrowStreamWriter arrowWriter = null;
        VectorLoader loader = null;
        String firstChannelFullyQualifiedTableName = null;
        Map<String, RowBufferStats> columnEpStatsMapCombined = null;
        Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;
        try {
            for (ChannelData<VectorSchemaRoot> data : channelsDataPerTable) {
                ChannelMetadata channelMetadata = ChannelMetadata.builder().setOwningChannelFromContext(data.getChannelContext()).setRowSequencer(data.getRowSequencer()).setOffsetToken(data.getOffsetToken()).build();
                channelsMetadataList.add(channelMetadata);
                logger.logDebug("Start building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), Float.valueOf(data.getBufferSize()), filePath);
                if (root == null) {
                    columnEpStatsMapCombined = data.getColumnEps();
                    root = data.getVectors();
                    arrowWriter = new ArrowStreamWriter(root, null, (OutputStream)chunkData);
                    loader = new VectorLoader(root);
                    firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
                    arrowWriter.start();
                    chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
                } else {
                    if (!data.getChannelContext().getFullyQualifiedTableName().equals(firstChannelFullyQualifiedTableName)) {
                        throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK, new Object[0]);
                    }
                    columnEpStatsMapCombined = ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps());
                    chunkMinMaxInsertTimeInMs = ChannelData.getCombinedMinMaxInsertTimeInMs(chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
                    VectorUnloader unloader = new VectorUnloader(data.getVectors());
                    ArrowRecordBatch recordBatch = unloader.getRecordBatch();
                    loader.load(recordBatch);
                    recordBatch.close();
                    data.getVectors().close();
                }
                arrowWriter.writeBatch();
                rowCount += (long)data.getRowCount();
                logger.logDebug("Finish building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), Float.valueOf(data.getBufferSize()), filePath);
            }
        }
        finally {
            if (arrowWriter != null) {
                arrowWriter.close();
                root.close();
            }
        }
        return new Flusher.SerializationResult(channelsMetadataList, columnEpStatsMapCombined, rowCount, chunkData, chunkMinMaxInsertTimeInMs);
    }
}

