/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.internal.apache.parquet.hadoop.BdecParquetReader;
import net.snowflake.ingest.internal.apache.parquet.hadoop.BdecParquetWriter;
import net.snowflake.ingest.internal.apache.parquet.schema.MessageType;
import net.snowflake.ingest.streaming.internal.ChannelData;
import net.snowflake.ingest.streaming.internal.ChannelMetadata;
import net.snowflake.ingest.streaming.internal.Flusher;
import net.snowflake.ingest.streaming.internal.ParquetChunkData;
import net.snowflake.ingest.streaming.internal.RowBufferStats;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;

public class ParquetFlusher
implements Flusher<ParquetChunkData> {
    private static final Logging logger = new Logging(ParquetFlusher.class);
    private final MessageType schema;
    private final boolean enableParquetInternalBuffering;
    private final long maxChunkSizeInBytes;
    private final Constants.BdecParquetCompression bdecParquetCompression;

    public ParquetFlusher(MessageType schema, boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression) {
        this.schema = schema;
        this.enableParquetInternalBuffering = enableParquetInternalBuffering;
        this.maxChunkSizeInBytes = maxChunkSizeInBytes;
        this.bdecParquetCompression = bdecParquetCompression;
    }

    @Override
    public Flusher.SerializationResult serialize(List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath) throws IOException {
        if (this.enableParquetInternalBuffering) {
            return this.serializeFromParquetWriteBuffers(channelsDataPerTable, filePath);
        }
        return this.serializeFromJavaObjects(channelsDataPerTable, filePath);
    }

    private Flusher.SerializationResult serializeFromParquetWriteBuffers(List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath) throws IOException {
        ArrayList<ChannelMetadata> channelsMetadataList = new ArrayList<ChannelMetadata>();
        long rowCount = 0L;
        float chunkEstimatedUncompressedSize = 0.0f;
        String firstChannelFullyQualifiedTableName = null;
        Map<String, RowBufferStats> columnEpStatsMapCombined = null;
        BdecParquetWriter mergedChannelWriter = null;
        ByteArrayOutputStream mergedChunkData = new ByteArrayOutputStream();
        Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;
        for (ChannelData<ParquetChunkData> data : channelsDataPerTable) {
            ChannelMetadata channelMetadata = ChannelMetadata.builder().setOwningChannelFromContext(data.getChannelContext()).setRowSequencer(data.getRowSequencer()).setOffsetToken(data.getEndOffsetToken()).setStartOffsetToken(data.getStartOffsetToken()).build();
            channelsMetadataList.add(channelMetadata);
            logger.logDebug("Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), Float.valueOf(data.getBufferSize()), filePath);
            if (mergedChannelWriter == null) {
                columnEpStatsMapCombined = data.getColumnEps();
                mergedChannelWriter = data.getVectors().parquetWriter;
                mergedChunkData = data.getVectors().output;
                firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
                chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
            } else {
                if (!data.getChannelContext().getFullyQualifiedTableName().equals(firstChannelFullyQualifiedTableName)) {
                    throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK, new Object[0]);
                }
                columnEpStatsMapCombined = ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps());
                data.getVectors().parquetWriter.close();
                BdecParquetReader.readFileIntoWriter(data.getVectors().output.toByteArray(), mergedChannelWriter);
                chunkMinMaxInsertTimeInMs = ChannelData.getCombinedMinMaxInsertTimeInMs(chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
            }
            rowCount += (long)data.getRowCount();
            chunkEstimatedUncompressedSize += data.getBufferSize();
            logger.logDebug("Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), Float.valueOf(data.getBufferSize()), filePath);
        }
        if (mergedChannelWriter != null) {
            mergedChannelWriter.close();
        }
        return new Flusher.SerializationResult(channelsMetadataList, columnEpStatsMapCombined, rowCount, chunkEstimatedUncompressedSize, mergedChunkData, chunkMinMaxInsertTimeInMs);
    }

    private Flusher.SerializationResult serializeFromJavaObjects(List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath) throws IOException {
        ArrayList<ChannelMetadata> channelsMetadataList = new ArrayList<ChannelMetadata>();
        long rowCount = 0L;
        float chunkEstimatedUncompressedSize = 0.0f;
        String firstChannelFullyQualifiedTableName = null;
        Map<String, RowBufferStats> columnEpStatsMapCombined = null;
        ArrayList<List<Object>> rows = null;
        ByteArrayOutputStream mergedData = new ByteArrayOutputStream();
        Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;
        for (ChannelData<ParquetChunkData> data : channelsDataPerTable) {
            ChannelMetadata channelMetadata = ChannelMetadata.builder().setOwningChannelFromContext(data.getChannelContext()).setRowSequencer(data.getRowSequencer()).setOffsetToken(data.getEndOffsetToken()).setStartOffsetToken(data.getStartOffsetToken()).build();
            channelsMetadataList.add(channelMetadata);
            logger.logDebug("Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}, enableParquetMemoryOptimization={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), Float.valueOf(data.getBufferSize()), filePath, this.enableParquetInternalBuffering);
            if (rows == null) {
                columnEpStatsMapCombined = data.getColumnEps();
                rows = new ArrayList<List<Object>>();
                firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
                chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
            } else {
                if (!data.getChannelContext().getFullyQualifiedTableName().equals(firstChannelFullyQualifiedTableName)) {
                    throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK, new Object[0]);
                }
                columnEpStatsMapCombined = ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps());
                chunkMinMaxInsertTimeInMs = ChannelData.getCombinedMinMaxInsertTimeInMs(chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
            }
            rows.addAll(data.getVectors().rows);
            rowCount += (long)data.getRowCount();
            chunkEstimatedUncompressedSize += data.getBufferSize();
            logger.logDebug("Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}, enableParquetMemoryOptimization={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), Float.valueOf(data.getBufferSize()), filePath, this.enableParquetInternalBuffering);
        }
        Map<String, String> metadata = channelsDataPerTable.get((int)0).getVectors().metadata;
        BdecParquetWriter parquetWriter = new BdecParquetWriter(mergedData, this.schema, metadata, firstChannelFullyQualifiedTableName, this.maxChunkSizeInBytes, this.bdecParquetCompression);
        rows.forEach(parquetWriter::writeRow);
        parquetWriter.close();
        return new Flusher.SerializationResult(channelsMetadataList, columnEpStatsMapCombined, rowCount, chunkEstimatedUncompressedSize, mergedData, chunkMinMaxInsertTimeInMs);
    }
}

