/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.internal.ChannelData;
import net.snowflake.ingest.streaming.internal.ChannelMetadata;
import net.snowflake.ingest.streaming.internal.Flusher;
import net.snowflake.ingest.streaming.internal.ParquetChunkData;
import net.snowflake.ingest.streaming.internal.RowBufferStats;
import net.snowflake.ingest.streaming.internal.StreamingIngestUtils;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;

public class ParquetFlusher
implements Flusher<ParquetChunkData> {
    private static final Logging logger = new Logging(ParquetFlusher.class);
    private final MessageType schema;
    private final long maxChunkSizeInBytes;
    private final Optional<Integer> maxRowGroups;
    private final Constants.BdecParquetCompression bdecParquetCompression;

    public ParquetFlusher(MessageType schema, long maxChunkSizeInBytes, Optional<Integer> maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression) {
        this.schema = schema;
        this.maxChunkSizeInBytes = maxChunkSizeInBytes;
        this.maxRowGroups = maxRowGroups;
        this.bdecParquetCompression = bdecParquetCompression;
    }

    @Override
    public Flusher.SerializationResult serialize(List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath) throws IOException {
        return this.serializeFromJavaObjects(channelsDataPerTable, filePath);
    }

    private Flusher.SerializationResult serializeFromJavaObjects(List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath) throws IOException {
        ArrayList<ChannelMetadata> channelsMetadataList = new ArrayList<ChannelMetadata>();
        long rowCount = 0L;
        float chunkEstimatedUncompressedSize = 0.0f;
        String firstChannelFullyQualifiedTableName = null;
        Map<String, RowBufferStats> columnEpStatsMapCombined = null;
        ArrayList<List<Object>> rows = null;
        ByteArrayOutputStream mergedData = new ByteArrayOutputStream();
        Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;
        for (ChannelData<ParquetChunkData> data : channelsDataPerTable) {
            ChannelMetadata channelMetadata = ChannelMetadata.builder().setOwningChannelFromContext(data.getChannelContext()).setRowSequencer(data.getRowSequencer()).setOffsetToken(data.getEndOffsetToken()).setStartOffsetToken(data.getStartOffsetToken()).build();
            channelsMetadataList.add(channelMetadata);
            logger.logDebug("Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), Float.valueOf(data.getBufferSize()), filePath);
            if (rows == null) {
                columnEpStatsMapCombined = data.getColumnEps();
                rows = new ArrayList<List<Object>>();
                firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
                chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
            } else {
                if (!data.getChannelContext().getFullyQualifiedTableName().equals(firstChannelFullyQualifiedTableName)) {
                    throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK, new Object[0]);
                }
                columnEpStatsMapCombined = ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps());
                chunkMinMaxInsertTimeInMs = ChannelData.getCombinedMinMaxInsertTimeInMs(chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
            }
            rows.addAll(data.getVectors().rows);
            rowCount += (long)data.getRowCount();
            chunkEstimatedUncompressedSize += data.getBufferSize();
            logger.logDebug("Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), Float.valueOf(data.getBufferSize()), filePath);
        }
        Map<String, String> metadata = channelsDataPerTable.get((int)0).getVectors().metadata;
        metadata.put("primaryFileId", StreamingIngestUtils.getShortname(filePath));
        BdecParquetWriter parquetWriter = new BdecParquetWriter(mergedData, this.schema, metadata, firstChannelFullyQualifiedTableName, this.maxChunkSizeInBytes, this.maxRowGroups, this.bdecParquetCompression);
        rows.forEach(parquetWriter::writeRow);
        parquetWriter.close();
        this.verifyRowCounts(parquetWriter, rowCount, channelsDataPerTable, rows.size());
        return new Flusher.SerializationResult(channelsMetadataList, columnEpStatsMapCombined, rowCount, chunkEstimatedUncompressedSize, mergedData, chunkMinMaxInsertTimeInMs);
    }

    private void verifyRowCounts(BdecParquetWriter writer, long totalMetadataRowCount, List<ChannelData<ParquetChunkData>> channelsDataPerTable, long javaSerializationTotalRowCount) {
        long parquetTotalRowsWritten = writer.getRowsWritten();
        List<Long> parquetFooterRowsPerBlock = writer.getRowCountsFromFooter();
        long parquetTotalRowsInFooter = 0L;
        for (long perBlockCount : parquetFooterRowsPerBlock) {
            parquetTotalRowsInFooter += perBlockCount;
        }
        if (parquetTotalRowsInFooter != totalMetadataRowCount || parquetTotalRowsWritten != totalMetadataRowCount) {
            String perChannelRowCountsInMetadata = channelsDataPerTable.stream().map(x -> String.valueOf(x.getRowCount())).collect(Collectors.joining(","));
            String channelNames = channelsDataPerTable.stream().map(x -> String.valueOf(x.getChannelContext().getName())).collect(Collectors.joining(","));
            String perBlockRowCountsInFooter = parquetFooterRowsPerBlock.stream().map(String::valueOf).collect(Collectors.joining(","));
            long channelsCountInMetadata = channelsDataPerTable.size();
            throw new SFException(ErrorCode.INTERNAL_ERROR, String.format("The number of rows in Parquet does not match the number of rows in metadata. parquetTotalRowsInFooter=%d totalMetadataRowCount=%d parquetTotalRowsWritten=%d perChannelRowCountsInMetadata=%s perBlockRowCountsInFooter=%s channelsCountInMetadata=%d countOfSerializedJavaObjects=%d channelNames=%s", parquetTotalRowsInFooter, totalMetadataRowCount, parquetTotalRowsWritten, perChannelRowCountsInMetadata, perBlockRowCountsInFooter, channelsCountInMetadata, javaSerializationTotalRowCount, channelNames));
        }
    }
}

