/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.CachingTableSupplier;
import org.apache.iceberg.flink.sink.EqualityFieldKeySelector;
import org.apache.iceberg.flink.sink.FlinkWriteResult;
import org.apache.iceberg.flink.sink.IcebergFilesCommitter;
import org.apache.iceberg.flink.sink.IcebergSinkBuilder;
import org.apache.iceberg.flink.sink.IcebergStreamWriter;
import org.apache.iceberg.flink.sink.PartitionKeySelector;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.sink.SinkUtil;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperatorFactory;
import org.apache.iceberg.flink.sink.shuffle.RangePartitioner;
import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecordTypeInformation;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSink {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
    private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
    private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();

    private FlinkSink() {
    }

    public static <T> Builder builderFor(DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) {
        return new Builder().forMapperOutputType(input, mapper, outputType);
    }

    public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
        RowType rowType = (RowType)tableSchema.toRowDataType().getLogicalType();
        DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
        DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
        return FlinkSink.builderFor(input, arg_0 -> ((DataFormatConverters.RowConverter)rowConverter).toInternal(arg_0), FlinkCompatibilityUtil.toTypeInfo(rowType)).tableSchema(tableSchema);
    }

    public static Builder forRowData(DataStream<RowData> input) {
        return new Builder().forRowData(input);
    }

    static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
        if (requestedSchema != null) {
            Schema writeSchema = TypeUtil.reassignIds((Schema)FlinkSchemaUtil.convert(requestedSchema), (Schema)schema);
            TypeUtil.validateWriteSchema((Schema)schema, (Schema)writeSchema, (Boolean)true, (Boolean)true);
            return (RowType)requestedSchema.toRowDataType().getLogicalType();
        }
        return FlinkSchemaUtil.convert(schema);
    }

    static IcebergStreamWriter<RowData> createStreamWriter(SerializableSupplier<Table> tableSupplier, FlinkWriteConf flinkWriteConf, RowType flinkRowType, List<Integer> equalityFieldIds) {
        Preconditions.checkArgument((tableSupplier != null ? 1 : 0) != 0, (Object)"Iceberg table supplier shouldn't be null");
        Table initTable = (Table)tableSupplier.get();
        FileFormat format = flinkWriteConf.dataFileFormat();
        RowDataTaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(tableSupplier, flinkRowType, flinkWriteConf.targetDataFileSize(), format, FlinkSink.writeProperties(initTable, format, flinkWriteConf), equalityFieldIds, flinkWriteConf.upsertMode());
        return new IcebergStreamWriter<RowData>(initTable.name(), taskWriterFactory);
    }

    private static Map<String, String> writeProperties(Table table, FileFormat format, FlinkWriteConf conf) {
        HashMap writeProperties = Maps.newHashMap((Map)table.properties());
        switch (format) {
            case PARQUET: {
                writeProperties.put("write.parquet.compression-codec", conf.parquetCompressionCodec());
                String parquetCompressionLevel = conf.parquetCompressionLevel();
                if (parquetCompressionLevel == null) break;
                writeProperties.put("write.parquet.compression-level", parquetCompressionLevel);
                break;
            }
            case AVRO: {
                writeProperties.put("write.avro.compression-codec", conf.avroCompressionCodec());
                String avroCompressionLevel = conf.avroCompressionLevel();
                if (avroCompressionLevel == null) break;
                writeProperties.put("write.avro.compression-level", conf.avroCompressionLevel());
                break;
            }
            case ORC: {
                writeProperties.put("write.orc.compression-codec", conf.orcCompressionCodec());
                writeProperties.put("write.orc.compression-strategy", conf.orcCompressionStrategy());
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unknown file format %s", format));
            }
        }
        return writeProperties;
    }

    public static class Builder
    implements IcebergSinkBuilder<Builder> {
        private Function<String, DataStream<RowData>> inputCreator = null;
        private TableLoader tableLoader;
        private Table table;
        private TableSchema tableSchema;
        private List<String> equalityFieldColumns = null;
        private String uidPrefix = null;
        private final Map<String, String> snapshotProperties = Maps.newHashMap();
        private ReadableConfig readableConfig = new Configuration();
        private final Map<String, String> writeOptions = Maps.newHashMap();
        private FlinkWriteConf flinkWriteConf = null;

        private Builder() {
        }

        private Builder forRowData(DataStream<RowData> newRowDataInput) {
            this.inputCreator = ignored -> newRowDataInput;
            return this;
        }

        private <T> Builder forMapperOutputType(DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) {
            this.inputCreator = newUidPrefix -> {
                SingleOutputStreamOperator inputStream = input.map(mapper, outputType).setParallelism(input.getParallelism());
                if (newUidPrefix != null) {
                    inputStream.name(this.operatorName((String)newUidPrefix)).uid(newUidPrefix + "-mapper");
                }
                return inputStream;
            };
            return this;
        }

        @Override
        public Builder table(Table newTable) {
            this.table = newTable;
            return this;
        }

        @Override
        public Builder tableLoader(TableLoader newTableLoader) {
            this.tableLoader = newTableLoader;
            return this;
        }

        public Builder set(String property, String value) {
            this.writeOptions.put(property, value);
            return this;
        }

        @Override
        public Builder setAll(Map<String, String> properties) {
            this.writeOptions.putAll(properties);
            return this;
        }

        @Override
        public Builder tableSchema(TableSchema newTableSchema) {
            this.tableSchema = newTableSchema;
            return this;
        }

        @Override
        public Builder overwrite(boolean newOverwrite) {
            this.writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
            return this;
        }

        @Override
        public Builder flinkConf(ReadableConfig config) {
            this.readableConfig = config;
            return this;
        }

        @Override
        public Builder distributionMode(DistributionMode mode) {
            if (mode != null) {
                this.writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
            }
            return this;
        }

        public Builder rangeDistributionStatisticsType(StatisticsType type) {
            if (type != null) {
                this.writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name());
            }
            return this;
        }

        public Builder rangeDistributionSortKeyBaseWeight(double weight) {
            this.writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key(), Double.toString(weight));
            return this;
        }

        @Override
        public Builder writeParallelism(int newWriteParallelism) {
            this.writeOptions.put(FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
            return this;
        }

        @Override
        public Builder upsert(boolean enabled) {
            this.writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
            return this;
        }

        @Override
        public Builder equalityFieldColumns(List<String> columns) {
            this.equalityFieldColumns = columns;
            return this;
        }

        public Builder uidPrefix(String newPrefix) {
            this.uidPrefix = newPrefix;
            return this;
        }

        public Builder setSnapshotProperties(Map<String, String> properties) {
            this.snapshotProperties.putAll(properties);
            return this;
        }

        public Builder setSnapshotProperty(String property, String value) {
            this.snapshotProperties.put(property, value);
            return this;
        }

        @Override
        public Builder toBranch(String branch) {
            this.writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
            return this;
        }

        private DataStreamSink<Void> chainIcebergOperators() {
            Preconditions.checkArgument((this.inputCreator != null ? 1 : 0) != 0, (Object)"Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
            Preconditions.checkNotNull((Object)this.tableLoader, (Object)"Table loader shouldn't be null");
            DataStream<RowData> rowDataInput = this.inputCreator.apply(this.uidPrefix);
            if (this.table == null) {
                if (!this.tableLoader.isOpen()) {
                    this.tableLoader.open();
                }
                try (TableLoader loader = this.tableLoader;){
                    this.table = loader.loadTable();
                }
                catch (IOException e) {
                    throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, e);
                }
            }
            this.flinkWriteConf = new FlinkWriteConf(this.table, this.writeOptions, this.readableConfig);
            List<Integer> equalityFieldIds = SinkUtil.checkAndGetEqualityFieldIds(this.table, this.equalityFieldColumns);
            RowType flinkRowType = FlinkSink.toFlinkRowType(this.table.schema(), this.tableSchema);
            int writerParallelism = this.flinkWriteConf.writeParallelism() == null ? rowDataInput.getParallelism() : this.flinkWriteConf.writeParallelism().intValue();
            DataStream<RowData> distributeStream = this.distributeDataStream(rowDataInput, equalityFieldIds, flinkRowType, writerParallelism);
            SingleOutputStreamOperator<FlinkWriteResult> writerStream = this.appendWriter(distributeStream, flinkRowType, equalityFieldIds, writerParallelism);
            SingleOutputStreamOperator<Void> committerStream = this.appendCommitter(writerStream);
            return this.appendDummySink(committerStream);
        }

        @Override
        public DataStreamSink<Void> append() {
            return this.chainIcebergOperators();
        }

        private String operatorName(String suffix) {
            return this.uidPrefix != null ? this.uidPrefix + "-" + suffix : suffix;
        }

        @VisibleForTesting
        List<Integer> checkAndGetEqualityFieldIds() {
            ArrayList equalityFieldIds = Lists.newArrayList((Iterable)this.table.schema().identifierFieldIds());
            if (this.equalityFieldColumns != null && !this.equalityFieldColumns.isEmpty()) {
                HashSet equalityFieldSet = Sets.newHashSetWithExpectedSize((int)this.equalityFieldColumns.size());
                for (String column : this.equalityFieldColumns) {
                    Types.NestedField field = this.table.schema().findField(column);
                    Preconditions.checkNotNull((Object)field, (String)"Missing required equality field column '%s' in table schema %s", (Object)column, (Object)this.table.schema());
                    equalityFieldSet.add(field.fieldId());
                }
                if (!equalityFieldSet.equals(this.table.schema().identifierFieldIds())) {
                    LOG.warn("The configured equality field column IDs {} are not matched with the schema identifier field IDs {}, use job specified equality field columns as the equality fields by default.", (Object)equalityFieldSet, (Object)this.table.schema().identifierFieldIds());
                }
                equalityFieldIds = Lists.newArrayList((Iterable)equalityFieldSet);
            }
            return equalityFieldIds;
        }

        private DataStreamSink<Void> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
            DataStreamSink resultStream = committerStream.sinkTo((Sink)new DiscardingSink()).name(this.operatorName(String.format("IcebergSink %s", this.table.name()))).setParallelism(1);
            if (this.uidPrefix != null) {
                resultStream = resultStream.uid(this.uidPrefix + "-dummysink");
            }
            return resultStream;
        }

        private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<FlinkWriteResult> writerStream) {
            IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.flinkWriteConf.overwriteMode(), this.snapshotProperties, this.flinkWriteConf.workerPoolSize(), this.flinkWriteConf.branch(), this.table.spec());
            SingleOutputStreamOperator committerStream = writerStream.transform(this.operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, (OneInputStreamOperator)filesCommitter).setParallelism(1).setMaxParallelism(1);
            if (this.uidPrefix != null) {
                committerStream = committerStream.uid(this.uidPrefix + "-committer");
            }
            return committerStream;
        }

        private SingleOutputStreamOperator<FlinkWriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType, List<Integer> equalityFieldIds, int writerParallelism) {
            if (this.flinkWriteConf.upsertMode()) {
                Preconditions.checkState((!this.flinkWriteConf.overwriteMode() ? 1 : 0) != 0, (Object)"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
                Preconditions.checkState((!equalityFieldIds.isEmpty() ? 1 : 0) != 0, (Object)"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
                if (!this.table.spec().isUnpartitioned()) {
                    for (PartitionField partitionField : this.table.spec().fields()) {
                        Preconditions.checkState((boolean)equalityFieldIds.contains(partitionField.sourceId()), (String)"In UPSERT mode, partition field '%s' should be included in equality fields: '%s'", (Object)partitionField, this.equalityFieldColumns);
                    }
                }
            }
            SerializableTable serializableTable = (SerializableTable)SerializableTable.copyOf((Table)this.table);
            Duration tableRefreshInterval = this.flinkWriteConf.tableRefreshInterval();
            CachingTableSupplier tableSupplier = tableRefreshInterval != null ? new CachingTableSupplier(serializableTable, this.tableLoader, tableRefreshInterval) : () -> serializableTable;
            IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(tableSupplier, this.flinkWriteConf, flinkRowType, equalityFieldIds);
            SingleOutputStreamOperator writerStream = input.transform(this.operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(FlinkWriteResult.class), streamWriter).setParallelism(writerParallelism);
            if (this.uidPrefix != null) {
                writerStream = writerStream.uid(this.uidPrefix + "-writer");
            }
            return writerStream;
        }

        private DataStream<RowData> distributeDataStream(DataStream<RowData> input, List<Integer> equalityFieldIds, RowType flinkRowType, int writerParallelism) {
            DistributionMode writeMode = this.flinkWriteConf.distributionMode();
            LOG.info("Write distribution mode is '{}'", (Object)writeMode.modeName());
            Schema iSchema = this.table.schema();
            PartitionSpec partitionSpec = this.table.spec();
            SortOrder sortOrder = this.table.sortOrder();
            switch (writeMode) {
                case NONE: {
                    if (equalityFieldIds.isEmpty()) {
                        return input;
                    }
                    LOG.info("Distribute rows by equality fields, because there are equality fields set");
                    return input.keyBy((KeySelector)new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
                }
                case HASH: {
                    if (equalityFieldIds.isEmpty()) {
                        if (partitionSpec.isUnpartitioned()) {
                            LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set and table is unpartitioned");
                            return input;
                        }
                        return input.keyBy((KeySelector)new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
                    }
                    if (partitionSpec.isUnpartitioned()) {
                        LOG.info("Distribute rows by equality fields, because there are equality fields set and table is unpartitioned");
                        return input.keyBy((KeySelector)new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
                    }
                    for (PartitionField partitionField : partitionSpec.fields()) {
                        Preconditions.checkState((boolean)equalityFieldIds.contains(partitionField.sourceId()), (String)"In 'hash' distribution mode with equality fields set, partition field '%s' should be included in equality fields: '%s'", (Object)partitionField, this.equalityFieldColumns);
                    }
                    return input.keyBy((KeySelector)new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
                }
                case RANGE: {
                    if (!equalityFieldIds.isEmpty()) {
                        LOG.warn("Hash distribute rows by equality fields, even though {}=range is set. Range distribution for primary keys are not always safe in Flink streaming writer.", (Object)"write.distribution-mode");
                        return input.keyBy((KeySelector)new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
                    }
                    Preconditions.checkState((sortOrder.isSorted() || partitionSpec.isPartitioned() ? 1 : 0) != 0, (Object)"Invalid write distribution mode: range. Need to define sort order or partition spec.");
                    if (sortOrder.isUnsorted()) {
                        sortOrder = Partitioning.sortOrderFor((PartitionSpec)partitionSpec);
                        LOG.info("Construct sort order from partition spec");
                    }
                    LOG.info("Range distribute rows by sort order: {}", (Object)sortOrder);
                    StatisticsOrRecordTypeInformation statisticsOrRecordTypeInformation = new StatisticsOrRecordTypeInformation(flinkRowType, iSchema, sortOrder);
                    StatisticsType statisticsType = this.flinkWriteConf.rangeDistributionStatisticsType();
                    SingleOutputStreamOperator shuffleStream = input.transform(this.operatorName("range-shuffle"), (TypeInformation)statisticsOrRecordTypeInformation, (OneInputStreamOperatorFactory)new DataStatisticsOperatorFactory(iSchema, sortOrder, writerParallelism, statisticsType, this.flinkWriteConf.rangeDistributionSortKeyBaseWeight())).setParallelism(input.getParallelism());
                    if (this.uidPrefix != null) {
                        shuffleStream = shuffleStream.uid(this.uidPrefix + "-shuffle");
                    }
                    return shuffleStream.partitionCustom((Partitioner)new RangePartitioner(iSchema, sortOrder), (KeySelector & Serializable)r -> r).flatMap((FlatMapFunction & Serializable)(statisticsOrRecord, out) -> {
                        if (statisticsOrRecord.hasRecord()) {
                            out.collect((Object)statisticsOrRecord.record());
                        }
                    }).setParallelism(writerParallelism).returns(RowData.class);
                }
            }
            throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
        }
    }
}

