/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.format.avro;

import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.avro.AbstractAvroBulkFormat;
import org.apache.paimon.format.avro.AvroBulkWriter;
import org.apache.paimon.format.avro.AvroSchemaConverter;
import org.apache.paimon.format.avro.AvroToRowDataConverters;
import org.apache.paimon.format.avro.AvroWriterFactory;
import org.apache.paimon.format.avro.RowDataToAvroConverters;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.shade.org.apache.avro.Schema;
import org.apache.paimon.shade.org.apache.avro.file.CodecFactory;
import org.apache.paimon.shade.org.apache.avro.file.DataFileWriter;
import org.apache.paimon.shade.org.apache.avro.generic.GenericData;
import org.apache.paimon.shade.org.apache.avro.generic.GenericDatumWriter;
import org.apache.paimon.shade.org.apache.avro.generic.GenericRecord;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Projection;

public class AvroFileFormat
extends FileFormat {
    public static final String IDENTIFIER = "avro";
    public static final ConfigOption<String> AVRO_OUTPUT_CODEC = ConfigOptions.key("codec").stringType().defaultValue("snappy").withDescription("The compression codec for avro");
    private final Options formatOptions;

    public AvroFileFormat(Options formatOptions) {
        super(IDENTIFIER);
        this.formatOptions = formatOptions;
    }

    @Override
    public FormatReaderFactory createReaderFactory(RowType type, int[][] projection, @Nullable List<Predicate> filters) {
        RowType producedType = Projection.of(projection).project(type);
        return new AvroGenericRecordBulkFormat((RowType)((DataType)producedType).copy(false));
    }

    @Override
    public FormatWriterFactory createWriterFactory(RowType type) {
        return new RowDataAvroWriterFactory(type, this.formatOptions.get(AVRO_OUTPUT_CODEC));
    }

    @Override
    public void validateDataFields(RowType rowType) {
        List<DataType> fieldTypes = rowType.getFieldTypes();
        for (DataType dataType : fieldTypes) {
            AvroSchemaConverter.convertToSchema(dataType);
        }
    }

    private static class RowDataAvroWriterFactory
    implements FormatWriterFactory {
        private final AvroWriterFactory<GenericRecord> factory;
        private final RowType rowType;

        private RowDataAvroWriterFactory(RowType rowType, String codec) {
            this.rowType = rowType;
            this.factory = new AvroWriterFactory(out -> {
                Schema schema = AvroSchemaConverter.convertToSchema(rowType);
                GenericDatumWriter datumWriter = new GenericDatumWriter(schema);
                DataFileWriter dataFileWriter = new DataFileWriter(datumWriter);
                if (codec != null) {
                    dataFileWriter.setCodec(CodecFactory.fromString(codec));
                }
                dataFileWriter.create(schema, out);
                return dataFileWriter;
            });
        }

        @Override
        public FormatWriter create(PositionOutputStream out, String compression) throws IOException {
            final AvroBulkWriter<GenericRecord> writer = this.factory.create(out);
            final RowDataToAvroConverters.RowDataToAvroConverter converter = RowDataToAvroConverters.createConverter(this.rowType);
            final Schema schema = AvroSchemaConverter.convertToSchema(this.rowType);
            return new FormatWriter(){

                @Override
                public void addElement(InternalRow element) throws IOException {
                    GenericRecord record = (GenericRecord)converter.convert(schema, element);
                    writer.addElement(record);
                }

                @Override
                public void flush() throws IOException {
                    writer.flush();
                }

                @Override
                public void finish() throws IOException {
                    writer.finish();
                }
            };
        }
    }

    private static class AvroGenericRecordBulkFormat
    extends AbstractAvroBulkFormat<GenericRecord> {
        private static final long serialVersionUID = 1L;
        private final RowType producedRowType;

        public AvroGenericRecordBulkFormat(RowType producedRowType) {
            super(AvroSchemaConverter.convertToSchema(producedRowType));
            this.producedRowType = producedRowType;
        }

        @Override
        protected GenericRecord createReusedAvroRecord() {
            return new GenericData.Record(this.readerSchema);
        }

        @Override
        protected Function<GenericRecord, InternalRow> createConverter() {
            AvroToRowDataConverters.AvroToRowDataConverter converter = AvroToRowDataConverters.createRowConverter(this.producedRowType);
            return record -> record == null ? null : (GenericRow)converter.convert(record);
        }
    }
}

