/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import net.snowflake.client.jdbc.internal.google.common.collect.Sets;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.internal.AbstractRowBuffer;
import net.snowflake.ingest.streaming.internal.ChannelRuntimeState;
import net.snowflake.ingest.streaming.internal.ColumnMetadata;
import net.snowflake.ingest.streaming.internal.Flusher;
import net.snowflake.ingest.streaming.internal.LiteralQuoteUtils;
import net.snowflake.ingest.streaming.internal.ParquetChunkData;
import net.snowflake.ingest.streaming.internal.ParquetFlusher;
import net.snowflake.ingest.streaming.internal.ParquetTypeGenerator;
import net.snowflake.ingest.streaming.internal.ParquetValueParser;
import net.snowflake.ingest.streaming.internal.RowBufferStats;
import net.snowflake.ingest.streaming.internal.StreamingIngestUtils;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class ParquetRowBuffer
extends AbstractRowBuffer<ParquetChunkData> {
    private static final Logging logger = new Logging(ParquetRowBuffer.class);
    private static final String PARQUET_MESSAGE_TYPE_NAME = "bdec";
    private final Map<String, Pair<ColumnMetadata, Integer>> fieldIndex = new HashMap<String, Pair<ColumnMetadata, Integer>>();
    private final Map<String, String> metadata = new HashMap<String, String>();
    private final List<List<Object>> data = new ArrayList<List<Object>>();
    private BdecParquetWriter bdecParquetWriter;
    private ByteArrayOutputStream fileOutput;
    private final List<List<Object>> tempData = new ArrayList<List<Object>>();
    private final String channelName;
    private MessageType schema;
    private final boolean bufferForTests;
    private final boolean enableParquetInternalBuffering;

    ParquetRowBuffer(OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, BufferAllocator allocator, String fullyQualifiedChannelName, Consumer<Float> rowSizeMetric, ChannelRuntimeState channelRuntimeState, boolean bufferForTests, boolean enableParquetInternalBuffering) {
        super(onErrorOption, defaultTimezone, allocator, fullyQualifiedChannelName, rowSizeMetric, channelRuntimeState);
        this.channelName = fullyQualifiedChannelName;
        this.bufferForTests = bufferForTests;
        this.enableParquetInternalBuffering = enableParquetInternalBuffering;
    }

    @Override
    public void setupSchema(List<ColumnMetadata> columns) {
        this.fieldIndex.clear();
        this.metadata.clear();
        this.metadata.put("sfVer", "1,1");
        ArrayList<Type> parquetTypes = new ArrayList<Type>();
        int id = 1;
        for (ColumnMetadata column : columns) {
            this.validateColumnCollation(column);
            ParquetTypeGenerator.ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(column, id);
            parquetTypes.add(typeInfo.getParquetType());
            this.metadata.putAll(typeInfo.getMetadata());
            this.fieldIndex.put(column.getInternalName(), new Pair<ColumnMetadata, Integer>(column, parquetTypes.size() - 1));
            if (!column.getNullable()) {
                this.addNonNullableFieldName(column.getInternalName());
            }
            this.statsMap.put(column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation()));
            if (this.onErrorOption == OpenChannelRequest.OnErrorOption.ABORT) {
                this.tempStatsMap.put(column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation()));
            }
            ++id;
        }
        this.schema = new MessageType(PARQUET_MESSAGE_TYPE_NAME, parquetTypes);
        this.createFileWriter();
        this.tempData.clear();
        this.data.clear();
    }

    private void createFileWriter() {
        this.fileOutput = new ByteArrayOutputStream();
        try {
            this.bdecParquetWriter = this.enableParquetInternalBuffering ? new BdecParquetWriter(this.fileOutput, this.schema, this.metadata, this.channelName) : null;
            this.data.clear();
        }
        catch (IOException e) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, "cannot create parquet writer", e);
        }
    }

    @Override
    boolean hasColumn(String name) {
        return this.fieldIndex.containsKey(name);
    }

    @Override
    float addRow(Map<String, Object> row, int curRowIndex, Map<String, RowBufferStats> statsMap, Set<String> formattedInputColumnNames) {
        return this.addRow(row, this::writeRow, statsMap, formattedInputColumnNames);
    }

    void writeRow(List<Object> row) {
        if (this.enableParquetInternalBuffering) {
            this.bdecParquetWriter.writeRow(row);
        } else {
            this.data.add(row);
        }
    }

    @Override
    float addTempRow(Map<String, Object> row, int curRowIndex, Map<String, RowBufferStats> statsMap, Set<String> formattedInputColumnNames) {
        return this.addRow(row, this::writeRow, statsMap, formattedInputColumnNames);
    }

    private float addRow(Map<String, Object> row, Consumer<List<Object>> out, Map<String, RowBufferStats> statsMap, Set<String> inputColumnNames) {
        Object[] indexedRow = new Object[this.fieldIndex.size()];
        float size = 0.0f;
        for (Map.Entry<String, Object> entry : row.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            String columnName = LiteralQuoteUtils.unquoteColumnName(key);
            int colIndex = this.fieldIndex.get(columnName).getSecond();
            RowBufferStats stats = statsMap.get(columnName);
            ColumnMetadata column = this.fieldIndex.get(columnName).getFirst();
            ColumnDescriptor columnDescriptor = (ColumnDescriptor)this.schema.getColumns().get(colIndex);
            PrimitiveType.PrimitiveTypeName typeName = columnDescriptor.getPrimitiveType().getPrimitiveTypeName();
            ParquetValueParser.ParquetBufferValue valueWithSize = ParquetValueParser.parseColumnValueToParquet(value, column, typeName, stats, this.defaultTimezone);
            indexedRow[colIndex] = valueWithSize.getValue();
            size += valueWithSize.getSize();
        }
        out.accept(Arrays.asList(indexedRow));
        for (String columnName : Sets.difference(this.fieldIndex.keySet(), inputColumnNames)) {
            statsMap.get(columnName).incCurrentNullCount();
        }
        return size;
    }

    @Override
    void moveTempRowsToActualBuffer(int tempRowCount) {
        this.tempData.forEach(this::writeRow);
    }

    @Override
    void clearTempRows() {
        this.tempData.clear();
    }

    @Override
    boolean hasColumns() {
        return !this.fieldIndex.isEmpty();
    }

    @Override
    Optional<ParquetChunkData> getSnapshot(String filePath) {
        this.metadata.put("primaryFileId", StreamingIngestUtils.getShortname(filePath));
        ArrayList<List<Object>> oldData = new ArrayList<List<Object>>();
        if (!this.enableParquetInternalBuffering) {
            this.data.forEach(r -> oldData.add(new ArrayList(r)));
        }
        return this.rowCount <= 0 ? Optional.empty() : Optional.of(new ParquetChunkData(oldData, this.bdecParquetWriter, this.fileOutput, this.metadata));
    }

    @Override
    Object getVectorValueAt(String column, int index) {
        if (this.data == null) {
            return null;
        }
        int colIndex = this.fieldIndex.get(column).getSecond();
        Object value = this.data.get(index).get(colIndex);
        ColumnMetadata columnMetadata = this.fieldIndex.get(column).getFirst();
        String physicalTypeStr = columnMetadata.getPhysicalType();
        AbstractRowBuffer.ColumnPhysicalType physicalType = AbstractRowBuffer.ColumnPhysicalType.valueOf(physicalTypeStr);
        String logicalTypeStr = columnMetadata.getLogicalType();
        AbstractRowBuffer.ColumnLogicalType logicalType = AbstractRowBuffer.ColumnLogicalType.valueOf(logicalTypeStr);
        if (logicalType == AbstractRowBuffer.ColumnLogicalType.FIXED) {
            if (physicalType == AbstractRowBuffer.ColumnPhysicalType.SB1) {
                value = ((Integer)value).byteValue();
            }
            if (physicalType == AbstractRowBuffer.ColumnPhysicalType.SB2) {
                value = ((Integer)value).shortValue();
            }
            if (physicalType == AbstractRowBuffer.ColumnPhysicalType.SB16) {
                value = new BigDecimal(new BigInteger((byte[])value), columnMetadata.getScale());
            }
        }
        if (logicalType == AbstractRowBuffer.ColumnLogicalType.BINARY && value != null) {
            value = value instanceof String ? (Object)((String)value).getBytes(StandardCharsets.UTF_8) : value;
        }
        return value;
    }

    @Override
    int getTempRowCount() {
        return this.tempData.size();
    }

    @Override
    void reset() {
        super.reset();
        this.createFileWriter();
        this.data.clear();
    }

    @Override
    void closeInternal() {
        this.fieldIndex.clear();
        if (this.bdecParquetWriter != null) {
            try {
                this.bdecParquetWriter.close();
            }
            catch (IOException e) {
                throw new SFException(ErrorCode.INTERNAL_ERROR, "Failed to close parquet writer", e);
            }
        }
    }

    @Override
    public Flusher<ParquetChunkData> createFlusher() {
        return new ParquetFlusher(this.schema, this.enableParquetInternalBuffering);
    }
}

