/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.NonNull;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.Conversions;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.Schema;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.data.TimeConversions;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.generic.GenericData;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.generic.GenericRecord;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.generic.GenericRecordBuilder;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.avro.AvroParquetWriter;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.column.ParquetProperties;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.hadoop.ParquetWriter;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.ConversionPatterns;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.MessageType;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.OriginalType;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.PrimitiveType;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.Type;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.Types;

public class ParquetWriteStrategy
extends AbstractWriteStrategy {
    private final LinkedHashMap<String, ParquetWriter<GenericRecord>> beingWrittenWriter = new LinkedHashMap();
    private AvroSchemaConverter schemaConverter;
    private Schema schema;
    public static final int[] PRECISION_TO_BYTE_COUNT = new int[38];

    public ParquetWriteStrategy(FileSinkConfig fileSinkConfig) {
        super(fileSinkConfig);
    }

    @Override
    public void init(HadoopConf conf, String jobId, String uuidPrefix, int subTaskIndex) {
        super.init(conf, jobId, uuidPrefix, subTaskIndex);
        this.schemaConverter = new AvroSchemaConverter(this.getConfiguration(this.hadoopConf));
    }

    @Override
    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
        if (seaTunnelRow == null) {
            throw new NullPointerException("seaTunnelRow is marked non-null but is null");
        }
        super.write(seaTunnelRow);
        String filePath = this.getOrCreateFilePathBeingWritten(seaTunnelRow);
        ParquetWriter<GenericRecord> writer = this.getOrCreateWriter(filePath);
        GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema);
        for (Integer integer : this.sinkColumnsIndexInRow) {
            String fieldName = this.seaTunnelRowType.getFieldName(integer.intValue());
            Object field = seaTunnelRow.getField(integer.intValue());
            recordBuilder.set(fieldName.toLowerCase(), this.resolveObject(field, this.seaTunnelRowType.getFieldType(integer.intValue())));
        }
        GenericData.Record record = recordBuilder.build();
        try {
            writer.write(record);
        }
        catch (IOException e) {
            String errorMsg = String.format("Write data to file [%s] error", filePath);
            throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
        }
    }

    @Override
    public void finishAndCloseFile() {
        this.beingWrittenWriter.forEach((k, v) -> {
            try {
                v.close();
            }
            catch (IOException e) {
                String errorMsg = String.format("Close file [%s] parquet writer failed, error msg: [%s]", k, e.getMessage());
                throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
            }
            this.needMoveFiles.put(k, this.getTargetLocation((String)k));
        });
        this.beingWrittenWriter.clear();
    }

    private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
        if (filePath == null) {
            throw new NullPointerException("filePath is marked non-null but is null");
        }
        if (this.schema == null) {
            this.schema = this.buildAvroSchemaWithRowType(this.seaTunnelRowType, this.sinkColumnsIndexInRow);
        }
        ParquetWriter<GenericRecord> writer = this.beingWrittenWriter.get(filePath);
        GenericData dataModel = new GenericData();
        dataModel.addLogicalTypeConversion(new Conversions.DecimalConversion());
        dataModel.addLogicalTypeConversion(new TimeConversions.DateConversion());
        dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
        if (writer == null) {
            Path path = new Path(filePath);
            try {
                HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, this.getConfiguration(this.hadoopConf));
                ParquetWriter<GenericRecord> newWriter = ((AvroParquetWriter.Builder)((AvroParquetWriter.Builder)((AvroParquetWriter.Builder)AvroParquetWriter.builder(outputFile).withWriteMode(ParquetFileWriter.Mode.OVERWRITE)).withDataModel(dataModel).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)).withCompressionCodec(this.compressFormat.getParquetCompression())).withSchema(this.schema).build();
                this.beingWrittenWriter.put(filePath, newWriter);
                return newWriter;
            }
            catch (IOException e) {
                String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
                throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
            }
        }
        return writer;
    }

    private Object resolveObject(Object data, SeaTunnelDataType<?> seaTunnelDataType) {
        if (data == null) {
            return null;
        }
        switch (seaTunnelDataType.getSqlType()) {
            case ARRAY: {
                BasicType elementType = ((ArrayType)seaTunnelDataType).getElementType();
                ArrayList<Object> records = new ArrayList<Object>(((Object[])data).length);
                for (Object object : (Object[])data) {
                    Object resolvedObject = this.resolveObject(object, (SeaTunnelDataType<?>)elementType);
                    records.add(resolvedObject);
                }
                return records;
            }
            case MAP: 
            case STRING: 
            case BOOLEAN: 
            case TINYINT: 
            case SMALLINT: 
            case INT: 
            case BIGINT: 
            case FLOAT: 
            case DOUBLE: 
            case NULL: 
            case DECIMAL: 
            case DATE: {
                return data;
            }
            case TIMESTAMP: {
                return ((LocalDateTime)data).toInstant(ZoneOffset.of("+8")).toEpochMilli();
            }
            case BYTES: {
                return ByteBuffer.wrap((byte[])data);
            }
            case ROW: {
                SeaTunnelRow seaTunnelRow = (SeaTunnelRow)data;
                SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType)seaTunnelDataType).getFieldTypes();
                String[] fieldNames = ((SeaTunnelRowType)seaTunnelDataType).getFieldNames();
                List<Integer> sinkColumnsIndex = IntStream.rangeClosed(0, fieldNames.length - 1).boxed().collect(Collectors.toList());
                Schema recordSchema = this.buildAvroSchemaWithRowType((SeaTunnelRowType)seaTunnelDataType, sinkColumnsIndex);
                GenericRecordBuilder recordBuilder = new GenericRecordBuilder(recordSchema);
                for (int i = 0; i < fieldNames.length; ++i) {
                    recordBuilder.set(fieldNames[i].toLowerCase(), this.resolveObject(seaTunnelRow.getField(i), fieldTypes[i]));
                }
                return recordBuilder.build();
            }
        }
        String errorMsg = String.format("SeaTunnel file connector is not supported for this data type [%s]", seaTunnelDataType.getSqlType());
        throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
    }

    public static Type seaTunnelDataType2ParquetDataType(String fieldName, SeaTunnelDataType<?> seaTunnelDataType) {
        switch (seaTunnelDataType.getSqlType()) {
            case ARRAY: {
                BasicType elementType = ((ArrayType)seaTunnelDataType).getElementType();
                return (Type)((Types.GroupBuilder)((Types.GroupBuilder)Types.optionalGroup().as(OriginalType.LIST)).addField((Type)((Types.GroupBuilder)Types.repeatedGroup().addField(ParquetWriteStrategy.seaTunnelDataType2ParquetDataType("array_element", elementType))).named("bag"))).named(fieldName);
            }
            case MAP: {
                SeaTunnelDataType keyType = ((MapType)seaTunnelDataType).getKeyType();
                SeaTunnelDataType valueType = ((MapType)seaTunnelDataType).getValueType();
                return ConversionPatterns.mapType(Type.Repetition.OPTIONAL, fieldName, ParquetWriteStrategy.seaTunnelDataType2ParquetDataType("key", keyType), ParquetWriteStrategy.seaTunnelDataType2ParquetDataType("value", valueType));
            }
            case STRING: {
                return (Type)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL).as(LogicalTypeAnnotation.stringType())).named(fieldName);
            }
            case BOOLEAN: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case TINYINT: {
                return (Type)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).as(LogicalTypeAnnotation.intType(8, true))).as(OriginalType.INT_8)).named(fieldName);
            }
            case SMALLINT: {
                return (Type)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).as(LogicalTypeAnnotation.intType(16, true))).as(OriginalType.INT_16)).named(fieldName);
            }
            case INT: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case DATE: {
                return (Type)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).as(LogicalTypeAnnotation.dateType())).as(OriginalType.DATE)).named(fieldName);
            }
            case BIGINT: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case TIMESTAMP: {
                return (Type)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).as(OriginalType.TIMESTAMP_MILLIS)).named(fieldName);
            }
            case FLOAT: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case DOUBLE: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case DECIMAL: {
                int precision = ((DecimalType)seaTunnelDataType).getPrecision();
                int scale = ((DecimalType)seaTunnelDataType).getScale();
                return (Type)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)Types.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(PRECISION_TO_BYTE_COUNT[precision - 1])).as(OriginalType.DECIMAL)).precision(precision)).scale(scale)).named(fieldName);
            }
            case BYTES: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case ROW: {
                SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType)seaTunnelDataType).getFieldTypes();
                String[] fieldNames = ((SeaTunnelRowType)seaTunnelDataType).getFieldNames();
                Type[] types = new Type[fieldTypes.length];
                for (int i = 0; i < fieldNames.length; ++i) {
                    Type type;
                    types[i] = type = ParquetWriteStrategy.seaTunnelDataType2ParquetDataType(fieldNames[i], fieldTypes[i]);
                }
                return (Type)((Types.GroupBuilder)Types.optionalGroup().addFields(types)).named(fieldName);
            }
        }
        String errorMsg = String.format("SeaTunnel file connector is not supported for this data type [%s]", seaTunnelDataType.getSqlType());
        throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
    }

    private Schema buildAvroSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, List<Integer> sinkColumnsIndex) {
        ArrayList types = new ArrayList();
        SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes();
        String[] fieldNames = seaTunnelRowType.getFieldNames();
        sinkColumnsIndex.forEach(index -> {
            Type type = ParquetWriteStrategy.seaTunnelDataType2ParquetDataType(fieldNames[index].toLowerCase(), fieldTypes[index]);
            types.add(type);
        });
        MessageType seaTunnelRow = (MessageType)((Types.GroupBuilder)Types.buildMessage().addFields(types.toArray(new Type[0]))).named("SeaTunnelRecord");
        return this.schemaConverter.convert(seaTunnelRow);
    }

    static {
        for (int prec = 1; prec <= 38; ++prec) {
            ParquetWriteStrategy.PRECISION_TO_BYTE_COUNT[prec - 1] = (int)Math.ceil((Math.log(Math.pow(10.0, prec) - 1.0) / Math.log(2.0) + 1.0) / 8.0);
        }
    }
}

