/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.FlinkMaintenanceConfig;
import org.apache.iceberg.flink.maintenance.api.LockConfig;
import org.apache.iceberg.flink.maintenance.api.RewriteDataFiles;
import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
import org.apache.iceberg.flink.maintenance.api.TableMaintenance;
import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
import org.apache.iceberg.flink.maintenance.operator.LockFactoryBuilder;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.flink.sink.CachingTableSupplier;
import org.apache.iceberg.flink.sink.CommittableToTableChangeConverter;
import org.apache.iceberg.flink.sink.EqualityFieldKeySelector;
import org.apache.iceberg.flink.sink.IcebergCommittable;
import org.apache.iceberg.flink.sink.IcebergCommittableSerializer;
import org.apache.iceberg.flink.sink.IcebergCommitter;
import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics;
import org.apache.iceberg.flink.sink.IcebergSinkBuilder;
import org.apache.iceberg.flink.sink.IcebergSinkWriter;
import org.apache.iceberg.flink.sink.IcebergStreamWriterMetrics;
import org.apache.iceberg.flink.sink.IcebergWriteAggregator;
import org.apache.iceberg.flink.sink.PartitionKeySelector;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.sink.SinkUtil;
import org.apache.iceberg.flink.sink.WriteResultSerializer;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperatorFactory;
import org.apache.iceberg.flink.sink.shuffle.RangePartitioner;
import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecordTypeInformation;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class IcebergSink
implements Sink<RowData>,
SupportsPreWriteTopology<RowData>,
SupportsCommitter<IcebergCommittable>,
SupportsPreCommitTopology<WriteResult, IcebergCommittable>,
SupportsPostCommitTopology<IcebergCommittable>,
SupportsConcurrentExecutionAttempts {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergSink.class);
    private final TableLoader tableLoader;
    private final Map<String, String> snapshotProperties;
    private final String uidSuffix;
    private final String sinkId;
    private final Map<String, String> writeProperties;
    private final RowType flinkRowType;
    private final SerializableSupplier<Table> tableSupplier;
    private final transient FlinkWriteConf flinkWriteConf;
    private final Set<Integer> equalityFieldIds;
    private final boolean upsertMode;
    private final FileFormat dataFileFormat;
    private final long targetDataFileSize;
    private final String branch;
    private final boolean overwriteMode;
    private final int workerPoolSize;
    private final boolean compactMode;
    private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
    private final Table table;
    private final Set<String> equalityFieldColumns = null;

    private IcebergSink(TableLoader tableLoader, Table table, Map<String, String> snapshotProperties, String uidSuffix, Map<String, String> writeProperties, RowType flinkRowType, SerializableSupplier<Table> tableSupplier, FlinkWriteConf flinkWriteConf, Set<Integer> equalityFieldIds, String branch, boolean overwriteMode, FlinkMaintenanceConfig flinkMaintenanceConfig) {
        this.tableLoader = tableLoader;
        this.snapshotProperties = snapshotProperties;
        this.uidSuffix = uidSuffix;
        this.writeProperties = writeProperties;
        this.flinkRowType = flinkRowType;
        this.tableSupplier = tableSupplier;
        this.flinkWriteConf = flinkWriteConf;
        this.equalityFieldIds = equalityFieldIds;
        this.branch = branch;
        this.overwriteMode = overwriteMode;
        this.table = table;
        this.upsertMode = flinkWriteConf.upsertMode();
        this.dataFileFormat = flinkWriteConf.dataFileFormat();
        this.targetDataFileSize = flinkWriteConf.targetDataFileSize();
        this.workerPoolSize = flinkWriteConf.workerPoolSize();
        this.sinkId = UUID.randomUUID().toString();
        this.compactMode = flinkWriteConf.compactMode();
        this.flinkMaintenanceConfig = flinkMaintenanceConfig;
    }

    public SinkWriter<RowData> createWriter(Sink.InitContext context) {
        RowDataTaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(this.tableSupplier, this.flinkRowType, this.targetDataFileSize, this.dataFileFormat, this.writeProperties, this.equalityFieldIds, this.upsertMode);
        IcebergStreamWriterMetrics metrics = new IcebergStreamWriterMetrics((MetricGroup)context.metricGroup(), this.table.name());
        return new IcebergSinkWriter(((Table)this.tableSupplier.get()).name(), taskWriterFactory, metrics, context.getSubtaskId(), context.getAttemptNumber());
    }

    public Committer<IcebergCommittable> createCommitter(CommitterInitContext context) {
        IcebergFilesCommitterMetrics metrics = new IcebergFilesCommitterMetrics((MetricGroup)context.metricGroup(), this.table.name());
        return new IcebergCommitter(this.tableLoader, this.branch, this.snapshotProperties, this.overwriteMode, this.workerPoolSize, this.sinkId, metrics, this.compactMode);
    }

    public SimpleVersionedSerializer<IcebergCommittable> getCommittableSerializer() {
        return new IcebergCommittableSerializer();
    }

    public void addPostCommitTopology(DataStream<CommittableMessage<IcebergCommittable>> committables) {
        if (!this.compactMode) {
            return;
        }
        String suffix = IcebergSink.defaultSuffix(this.uidSuffix, this.table.name());
        String postCommitUid = String.format("Sink post-commit : %s", suffix);
        SingleOutputStreamOperator tableChangeStream = committables.global().process((ProcessFunction)new CommittableToTableChangeConverter(this.table.io(), this.table.name(), this.table.specs())).uid(postCommitUid).forceNonParallel();
        try {
            RewriteDataFilesConfig rewriteDataFilesConfig = this.flinkMaintenanceConfig.createRewriteDataFilesConfig();
            RewriteDataFiles.Builder rewriteBuilder = RewriteDataFiles.builder().config(rewriteDataFilesConfig);
            LockConfig lockConfig = this.flinkMaintenanceConfig.createLockConfig();
            TriggerLockFactory triggerLockFactory = LockFactoryBuilder.build(lockConfig, this.table.name());
            String tableMaintenanceUid = String.format("TableMaintenance : %s", suffix);
            TableMaintenance.Builder builder = TableMaintenance.forChangeStream((DataStream<TableChange>)tableChangeStream, this.tableLoader, triggerLockFactory).uidSuffix(tableMaintenanceUid).add(rewriteBuilder);
            builder.rateLimit(Duration.ofSeconds(this.flinkMaintenanceConfig.rateLimit())).lockCheckDelay(Duration.ofSeconds(this.flinkMaintenanceConfig.lockCheckDelay())).slotSharingGroup(this.flinkMaintenanceConfig.slotSharingGroup()).parallelism(this.flinkMaintenanceConfig.parallelism()).append();
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to create tableMaintenance ", e);
        }
    }

    public DataStream<RowData> addPreWriteTopology(DataStream<RowData> inputDataStream) {
        return this.distributeDataStream(inputDataStream);
    }

    public DataStream<CommittableMessage<IcebergCommittable>> addPreCommitTopology(DataStream<CommittableMessage<WriteResult>> writeResults) {
        TypeInformation typeInformation = CommittableMessageTypeInfo.of(this::getCommittableSerializer);
        String suffix = IcebergSink.defaultSuffix(this.uidSuffix, this.table.name());
        String preCommitAggregatorUid = String.format("Sink pre-commit aggregator: %s", suffix);
        return writeResults.global().transform(preCommitAggregatorUid, typeInformation, (OneInputStreamOperator)new IcebergWriteAggregator(this.tableLoader)).uid(preCommitAggregatorUid).setParallelism(1).setMaxParallelism(1).global();
    }

    public SimpleVersionedSerializer<WriteResult> getWriteResultSerializer() {
        return new WriteResultSerializer();
    }

    private String operatorName(String suffix) {
        return this.uidSuffix != null ? suffix + "-" + this.uidSuffix : suffix;
    }

    private static String defaultSuffix(String uidSuffix, String defaultSuffix) {
        if (uidSuffix == null || uidSuffix.isEmpty()) {
            return defaultSuffix;
        }
        return uidSuffix;
    }

    private static SerializableTable checkAndGetTable(TableLoader tableLoader, Table table) {
        if (table == null) {
            SerializableTable serializableTable;
            block10: {
                if (!tableLoader.isOpen()) {
                    tableLoader.open();
                }
                TableLoader loader = tableLoader;
                try {
                    serializableTable = (SerializableTable)SerializableTable.copyOf((Table)loader.loadTable());
                    if (loader == null) break block10;
                }
                catch (Throwable throwable) {
                    try {
                        if (loader != null) {
                            try {
                                loader.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException("Failed to load iceberg table from table loader: " + String.valueOf(tableLoader), e);
                    }
                }
                loader.close();
            }
            return serializableTable;
        }
        return (SerializableTable)SerializableTable.copyOf((Table)table);
    }

    @Deprecated
    private static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
        if (requestedSchema != null) {
            Schema writeSchema = TypeUtil.reassignIds((Schema)FlinkSchemaUtil.convert(requestedSchema), (Schema)schema);
            TypeUtil.validateWriteSchema((Schema)schema, (Schema)writeSchema, (Boolean)true, (Boolean)true);
            return (RowType)requestedSchema.toRowDataType().getLogicalType();
        }
        return FlinkSchemaUtil.convert(schema);
    }

    private static RowType toFlinkRowType(Schema schema, ResolvedSchema requestedSchema) {
        if (requestedSchema != null) {
            Schema writeSchema = TypeUtil.reassignIds((Schema)FlinkSchemaUtil.convert(requestedSchema), (Schema)schema);
            TypeUtil.validateWriteSchema((Schema)schema, (Schema)writeSchema, (Boolean)true, (Boolean)true);
            return (RowType)requestedSchema.toSinkRowDataType().getLogicalType();
        }
        return FlinkSchemaUtil.convert(schema);
    }

    private DataStream<RowData> distributeDataStream(DataStream<RowData> input) {
        DistributionMode mode = this.flinkWriteConf.distributionMode();
        Schema schema = this.table.schema();
        PartitionSpec spec = this.table.spec();
        SortOrder sortOrder = this.table.sortOrder();
        LOG.info("Write distribution mode is '{}'", (Object)mode.modeName());
        switch (mode) {
            case NONE: {
                return this.distributeDataStreamByNoneDistributionMode(input, schema);
            }
            case HASH: {
                return this.distributeDataStreamByHashDistributionMode(input, schema, spec);
            }
            case RANGE: {
                return this.distributeDataStreamByRangeDistributionMode(input, schema, spec, sortOrder);
            }
        }
        throw new RuntimeException("Unrecognized write.distribution-mode: " + String.valueOf(mode));
    }

    private DataStream<RowData> distributeDataStreamByNoneDistributionMode(DataStream<RowData> input, Schema iSchema) {
        if (this.equalityFieldIds.isEmpty()) {
            return input;
        }
        LOG.info("Distribute rows by equality fields, because there are equality fields set");
        return input.keyBy((KeySelector)new EqualityFieldKeySelector(iSchema, this.flinkRowType, this.equalityFieldIds));
    }

    private DataStream<RowData> distributeDataStreamByHashDistributionMode(DataStream<RowData> input, Schema iSchema, PartitionSpec partitionSpec) {
        if (this.equalityFieldIds.isEmpty()) {
            if (partitionSpec.isUnpartitioned()) {
                LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set and table is unpartitioned");
                return input;
            }
            return input.keyBy((KeySelector)new PartitionKeySelector(partitionSpec, iSchema, this.flinkRowType));
        }
        if (partitionSpec.isUnpartitioned()) {
            LOG.info("Distribute rows by equality fields, because there are equality fields set and table is unpartitioned");
            return input.keyBy((KeySelector)new EqualityFieldKeySelector(iSchema, this.flinkRowType, this.equalityFieldIds));
        }
        for (PartitionField partitionField : partitionSpec.fields()) {
            Preconditions.checkState((boolean)this.equalityFieldIds.contains(partitionField.sourceId()), (String)"In 'hash' distribution mode with equality fields set, source column '%s' of partition field '%s' should be included in equality fields: '%s'", (Object)this.table.schema().findColumnName(partitionField.sourceId()), (Object)partitionField, this.equalityFieldColumns);
        }
        return input.keyBy((KeySelector)new PartitionKeySelector(partitionSpec, iSchema, this.flinkRowType));
    }

    private int resolveWriterParallelism(DataStream<RowData> input) {
        return Optional.ofNullable(this.flinkWriteConf.writeParallelism()).orElseGet(() -> input.getParallelism());
    }

    private DataStream<RowData> distributeDataStreamByRangeDistributionMode(DataStream<RowData> input, Schema iSchema, PartitionSpec partitionSpec, SortOrder sortOrderParam) {
        int writerParallelism = this.resolveWriterParallelism(input);
        SortOrder sortOrder = sortOrderParam;
        if (!this.equalityFieldIds.isEmpty()) {
            LOG.warn("Hash distribute rows by equality fields, even though {}=range is set. Range distribution for primary keys are not always safe in Flink streaming writer.", (Object)"write.distribution-mode");
            return input.keyBy((KeySelector)new EqualityFieldKeySelector(iSchema, this.flinkRowType, this.equalityFieldIds));
        }
        Preconditions.checkState((sortOrder.isSorted() || partitionSpec.isPartitioned() ? 1 : 0) != 0, (Object)"Invalid write distribution mode: range. Need to define sort order or partition spec.");
        if (sortOrder.isUnsorted()) {
            sortOrder = Partitioning.sortOrderFor((PartitionSpec)partitionSpec);
            LOG.info("Construct sort order from partition spec");
        }
        LOG.info("Range distribute rows by sort order: {}", (Object)sortOrder);
        StatisticsOrRecordTypeInformation statisticsOrRecordTypeInformation = new StatisticsOrRecordTypeInformation(this.flinkRowType, iSchema, sortOrder);
        StatisticsType statisticsType = this.flinkWriteConf.rangeDistributionStatisticsType();
        SingleOutputStreamOperator shuffleStream = input.transform(this.operatorName("range-shuffle"), (TypeInformation)statisticsOrRecordTypeInformation, (OneInputStreamOperatorFactory)new DataStatisticsOperatorFactory(iSchema, sortOrder, writerParallelism, statisticsType, this.flinkWriteConf.rangeDistributionSortKeyBaseWeight())).setParallelism(input.getParallelism());
        if (this.uidSuffix != null) {
            shuffleStream = shuffleStream.uid("shuffle-" + this.uidSuffix);
        }
        return shuffleStream.partitionCustom((Partitioner)new RangePartitioner(iSchema, sortOrder), (KeySelector & Serializable)r -> r).flatMap((FlatMapFunction & Serializable)(statisticsOrRecord, out) -> {
            if (statisticsOrRecord.hasRecord()) {
                out.collect((Object)statisticsOrRecord.record());
            }
        }).slotSharingGroup("shuffle-partition-custom-group").setParallelism(writerParallelism).returns(RowData.class);
    }

    public static <T> Builder builderFor(DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) {
        return new Builder().forMapperOutputType(input, mapper, outputType);
    }

    @Deprecated
    public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
        return new Builder().forRow(input, tableSchema);
    }

    public static Builder forRow(DataStream<Row> input, ResolvedSchema resolvedSchema) {
        return new Builder().forRow(input, resolvedSchema);
    }

    public static Builder forRowData(DataStream<RowData> input) {
        return new Builder().forRowData(input);
    }

    public static class Builder
    implements IcebergSinkBuilder<Builder> {
        private TableLoader tableLoader;
        private String uidSuffix = "";
        private Function<String, DataStream<RowData>> inputCreator = null;
        @Deprecated
        private TableSchema tableSchema;
        private ResolvedSchema resolvedSchema;
        private SerializableTable table;
        private final Map<String, String> writeOptions = Maps.newHashMap();
        private final Map<String, String> snapshotSummary = Maps.newHashMap();
        private ReadableConfig readableConfig = new Configuration();
        private List<String> equalityFieldColumns = null;

        private Builder() {
        }

        private Builder forRowData(DataStream<RowData> newRowDataInput) {
            this.inputCreator = ignored -> newRowDataInput;
            return this;
        }

        @Deprecated
        private Builder forRow(DataStream<Row> input, TableSchema inputTableSchema) {
            RowType rowType = (RowType)inputTableSchema.toRowDataType().getLogicalType();
            DataType[] fieldDataTypes = inputTableSchema.getFieldDataTypes();
            DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
            return this.forMapperOutputType(input, arg_0 -> ((DataFormatConverters.RowConverter)rowConverter).toInternal(arg_0), FlinkCompatibilityUtil.toTypeInfo(rowType)).tableSchema(inputTableSchema);
        }

        private Builder forRow(DataStream<Row> input, ResolvedSchema inputResolvedSchema) {
            RowType rowType = (RowType)inputResolvedSchema.toSinkRowDataType().getLogicalType();
            DataType[] fieldDataTypes = (DataType[])inputResolvedSchema.getColumnDataTypes().toArray(DataType[]::new);
            DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
            return this.forMapperOutputType(input, arg_0 -> ((DataFormatConverters.RowConverter)rowConverter).toInternal(arg_0), FlinkCompatibilityUtil.toTypeInfo(rowType)).resolvedSchema(inputResolvedSchema);
        }

        private <T> Builder forMapperOutputType(DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) {
            this.inputCreator = newUidSuffix -> {
                SingleOutputStreamOperator inputStream = input.map(mapper, outputType).setParallelism(input.getParallelism());
                if (newUidSuffix != null) {
                    String uid = String.format("Sink pre-writer mapper: %s", newUidSuffix);
                    inputStream.name(uid).uid(uid);
                }
                return inputStream;
            };
            return this;
        }

        @Override
        public Builder table(Table newTable) {
            this.table = (SerializableTable)SerializableTable.copyOf((Table)newTable);
            return this;
        }

        @Override
        public Builder tableLoader(TableLoader newTableLoader) {
            this.tableLoader = newTableLoader;
            return this;
        }

        TableLoader tableLoader() {
            return this.tableLoader;
        }

        public Builder set(String property, String value) {
            this.writeOptions.put(property, value);
            return this;
        }

        @Override
        public Builder setAll(Map<String, String> properties) {
            this.writeOptions.putAll(properties);
            return this;
        }

        @Override
        public Builder tableSchema(TableSchema newTableSchema) {
            this.tableSchema = newTableSchema;
            return this;
        }

        @Override
        public Builder resolvedSchema(ResolvedSchema newResolvedSchema) {
            this.resolvedSchema = newResolvedSchema;
            return this;
        }

        @Override
        public Builder overwrite(boolean newOverwrite) {
            this.writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
            return this;
        }

        @Override
        public Builder flinkConf(ReadableConfig config) {
            this.readableConfig = config;
            return this;
        }

        @Override
        public Builder distributionMode(DistributionMode mode) {
            if (mode != null) {
                this.writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
            }
            return this;
        }

        public Builder rangeDistributionStatisticsType(StatisticsType type) {
            if (type != null) {
                this.writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name());
            }
            return this;
        }

        public Builder rangeDistributionSortKeyBaseWeight(double weight) {
            this.writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key(), Double.toString(weight));
            return this;
        }

        @Override
        public Builder writeParallelism(int newWriteParallelism) {
            this.writeOptions.put(FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
            return this;
        }

        @Override
        public Builder upsert(boolean enabled) {
            this.writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
            return this;
        }

        @Override
        public Builder equalityFieldColumns(List<String> columns) {
            this.equalityFieldColumns = columns;
            return this;
        }

        public Builder uidSuffix(String newSuffix) {
            this.uidSuffix = newSuffix;
            return this;
        }

        public Builder snapshotProperties(Map<String, String> properties) {
            this.snapshotSummary.putAll(properties);
            return this;
        }

        public Builder setSnapshotProperty(String property, String value) {
            this.snapshotSummary.put(property, value);
            return this;
        }

        @Override
        public Builder toBranch(String branch) {
            this.writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
            return this;
        }

        IcebergSink build() {
            SerializableTable serializableTable;
            Preconditions.checkArgument((this.inputCreator != null ? 1 : 0) != 0, (Object)"Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
            Preconditions.checkNotNull((Object)this.tableLoader(), (Object)"Table loader shouldn't be null");
            this.table = serializableTable = IcebergSink.checkAndGetTable(this.tableLoader(), (Table)this.table);
            FlinkWriteConf flinkWriteConf = new FlinkWriteConf((Table)this.table, this.writeOptions, this.readableConfig);
            Duration tableRefreshInterval = flinkWriteConf.tableRefreshInterval();
            CachingTableSupplier tableSupplier = tableRefreshInterval != null ? new CachingTableSupplier(this.table, this.tableLoader(), tableRefreshInterval) : () -> serializableTable;
            boolean overwriteMode = flinkWriteConf.overwriteMode();
            Set<Integer> equalityFieldIds = SinkUtil.checkAndGetEqualityFieldIds((Table)this.table, this.equalityFieldColumns);
            if (flinkWriteConf.upsertMode()) {
                Preconditions.checkState((!overwriteMode ? 1 : 0) != 0, (Object)"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
                Preconditions.checkState((!equalityFieldIds.isEmpty() ? 1 : 0) != 0, (Object)"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
                if (!this.table.spec().isUnpartitioned()) {
                    for (PartitionField partitionField : this.table.spec().fields()) {
                        Preconditions.checkState((boolean)equalityFieldIds.contains(partitionField.sourceId()), (String)"In 'hash' distribution mode with equality fields set, source column '%s' of partition field '%s' should be included in equality fields: '%s'", (Object)this.table.schema().findColumnName(partitionField.sourceId()), (Object)partitionField, this.equalityFieldColumns);
                    }
                }
            }
            FlinkMaintenanceConfig flinkMaintenanceConfig = new FlinkMaintenanceConfig((Table)this.table, this.writeOptions, this.readableConfig);
            return new IcebergSink(this.tableLoader, (Table)this.table, this.snapshotSummary, this.uidSuffix, SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, (Table)this.table), this.resolvedSchema != null ? IcebergSink.toFlinkRowType(this.table.schema(), this.resolvedSchema) : IcebergSink.toFlinkRowType(this.table.schema(), this.tableSchema), tableSupplier, flinkWriteConf, equalityFieldIds, flinkWriteConf.branch(), overwriteMode, flinkMaintenanceConfig);
        }

        @Override
        public DataStreamSink<RowData> append() {
            IcebergSink sink = this.build();
            String suffix = IcebergSink.defaultSuffix(this.uidSuffix, this.table.name());
            DataStream<RowData> rowDataInput = this.inputCreator.apply(suffix);
            DataStreamSink rowDataDataStreamSink = rowDataInput.sinkTo((Sink)sink).uid(suffix).name(suffix);
            rowDataDataStreamSink.setParallelism(sink.resolveWriterParallelism(rowDataInput));
            return rowDataDataStreamSink;
        }
    }
}

