/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.dynamic;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.sink.SinkUtil;
import org.apache.iceberg.flink.sink.dynamic.DynamicCommittable;
import org.apache.iceberg.flink.sink.dynamic.DynamicCommittableSerializer;
import org.apache.iceberg.flink.sink.dynamic.DynamicCommitter;
import org.apache.iceberg.flink.sink.dynamic.DynamicCommitterMetrics;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecordGenerator;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecordInternal;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecordInternalType;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecordProcessor;
import org.apache.iceberg.flink.sink.dynamic.DynamicTableUpdateOperator;
import org.apache.iceberg.flink.sink.dynamic.DynamicWriteResult;
import org.apache.iceberg.flink.sink.dynamic.DynamicWriteResultAggregator;
import org.apache.iceberg.flink.sink.dynamic.DynamicWriteResultSerializer;
import org.apache.iceberg.flink.sink.dynamic.DynamicWriter;
import org.apache.iceberg.flink.sink.dynamic.DynamicWriterMetrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

@Experimental
public class DynamicIcebergSink
implements Sink<DynamicRecordInternal>,
SupportsPreWriteTopology<DynamicRecordInternal>,
SupportsCommitter<DynamicCommittable>,
SupportsPreCommitTopology<DynamicWriteResult, DynamicCommittable>,
SupportsPostCommitTopology<DynamicCommittable> {
    private final CatalogLoader catalogLoader;
    private final Map<String, String> snapshotProperties;
    private final String uidPrefix;
    private final String sinkId;
    private final Map<String, String> writeProperties;
    private final transient FlinkWriteConf flinkWriteConf;
    private final FileFormat dataFileFormat;
    private final long targetDataFileSize;
    private final boolean overwriteMode;
    private final int workerPoolSize;
    private final int cacheMaximumSize;

    DynamicIcebergSink(CatalogLoader catalogLoader, Map<String, String> snapshotProperties, String uidPrefix, Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf, int cacheMaximumSize) {
        this.catalogLoader = catalogLoader;
        this.snapshotProperties = snapshotProperties;
        this.uidPrefix = uidPrefix;
        this.writeProperties = writeProperties;
        this.flinkWriteConf = flinkWriteConf;
        this.dataFileFormat = flinkWriteConf.dataFileFormat();
        this.targetDataFileSize = flinkWriteConf.targetDataFileSize();
        this.overwriteMode = flinkWriteConf.overwriteMode();
        this.workerPoolSize = flinkWriteConf.workerPoolSize();
        this.cacheMaximumSize = cacheMaximumSize;
        this.sinkId = UUID.randomUUID().toString();
    }

    public SinkWriter<DynamicRecordInternal> createWriter(Sink.InitContext context) throws IOException {
        return new DynamicWriter(this.catalogLoader.loadCatalog(), this.dataFileFormat, this.targetDataFileSize, this.writeProperties, this.cacheMaximumSize, new DynamicWriterMetrics((MetricGroup)context.metricGroup()), context.getTaskInfo().getIndexOfThisSubtask(), context.getTaskInfo().getAttemptNumber());
    }

    public Committer<DynamicCommittable> createCommitter(CommitterInitContext context) {
        DynamicCommitterMetrics metrics = new DynamicCommitterMetrics((MetricGroup)context.metricGroup());
        return new DynamicCommitter(this.catalogLoader.loadCatalog(), this.snapshotProperties, this.overwriteMode, this.workerPoolSize, this.sinkId, metrics);
    }

    public SimpleVersionedSerializer<DynamicCommittable> getCommittableSerializer() {
        return new DynamicCommittableSerializer();
    }

    public void addPostCommitTopology(DataStream<CommittableMessage<DynamicCommittable>> committables) {
    }

    public DataStream<DynamicRecordInternal> addPreWriteTopology(DataStream<DynamicRecordInternal> inputDataStream) {
        return this.distributeDataStream(inputDataStream);
    }

    public DataStream<CommittableMessage<DynamicCommittable>> addPreCommitTopology(DataStream<CommittableMessage<DynamicWriteResult>> writeResults) {
        TypeInformation typeInformation = CommittableMessageTypeInfo.of(this::getCommittableSerializer);
        return writeResults.keyBy((KeySelector & Serializable)committable -> {
            if (committable instanceof CommittableSummary) {
                return "__summary";
            }
            CommittableWithLineage result = (CommittableWithLineage)committable;
            return ((DynamicWriteResult)result.getCommittable()).key().tableName();
        }).transform(DynamicIcebergSink.prefixIfNotNull(this.uidPrefix, this.sinkId + " Pre Commit"), typeInformation, (OneInputStreamOperator)new DynamicWriteResultAggregator(this.catalogLoader)).uid(DynamicIcebergSink.prefixIfNotNull(this.uidPrefix, this.sinkId + "-pre-commit-topology"));
    }

    public SimpleVersionedSerializer<DynamicWriteResult> getWriteResultSerializer() {
        return new DynamicWriteResultSerializer();
    }

    DataStream<DynamicRecordInternal> distributeDataStream(DataStream<DynamicRecordInternal> input) {
        return input.keyBy(DynamicRecordInternal::writerKey);
    }

    private static String prefixIfNotNull(String uidPrefix, String suffix) {
        return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
    }

    public static <T> Builder<T> forInput(DataStream<T> input) {
        return new Builder<T>().forInput(input);
    }

    public static class Builder<T> {
        private DataStream<T> input;
        private DynamicRecordGenerator<T> generator;
        private CatalogLoader catalogLoader;
        private String uidPrefix = null;
        private final Map<String, String> writeOptions = Maps.newHashMap();
        private final Map<String, String> snapshotSummary = Maps.newHashMap();
        private ReadableConfig readableConfig = new Configuration();
        private boolean immediateUpdate = false;
        private int cacheMaximumSize = 100;
        private long cacheRefreshMs = 1000L;
        private int inputSchemasPerTableCacheMaximumSize = 10;

        Builder() {
        }

        public Builder<T> forInput(DataStream<T> inputStream) {
            this.input = inputStream;
            return this;
        }

        public Builder<T> generator(DynamicRecordGenerator<T> inputGenerator) {
            this.generator = inputGenerator;
            return this;
        }

        public Builder<T> catalogLoader(CatalogLoader newCatalogLoader) {
            this.catalogLoader = newCatalogLoader;
            return this;
        }

        public Builder<T> set(String property, String value) {
            this.writeOptions.put(property, value);
            return this;
        }

        public Builder<T> setAll(Map<String, String> properties) {
            this.writeOptions.putAll(properties);
            return this;
        }

        public Builder<T> overwrite(boolean newOverwrite) {
            this.writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
            return this;
        }

        public Builder<T> flinkConf(ReadableConfig config) {
            this.readableConfig = config;
            return this;
        }

        public Builder<T> writeParallelism(int newWriteParallelism) {
            this.writeOptions.put(FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
            return this;
        }

        public Builder<T> uidPrefix(String newPrefix) {
            this.uidPrefix = newPrefix;
            return this;
        }

        public Builder<T> snapshotProperties(Map<String, String> properties) {
            this.snapshotSummary.putAll(properties);
            return this;
        }

        public Builder<T> setSnapshotProperty(String property, String value) {
            this.snapshotSummary.put(property, value);
            return this;
        }

        public Builder<T> toBranch(String branch) {
            this.writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
            return this;
        }

        public Builder<T> immediateTableUpdate(boolean newImmediateUpdate) {
            this.immediateUpdate = newImmediateUpdate;
            return this;
        }

        public Builder<T> cacheMaxSize(int maxSize) {
            this.cacheMaximumSize = maxSize;
            return this;
        }

        public Builder<T> cacheRefreshMs(long refreshMs) {
            this.cacheRefreshMs = refreshMs;
            return this;
        }

        public Builder<T> inputSchemasPerTableCacheMaxSize(int inputSchemasPerTableCacheMaxSize) {
            this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaxSize;
            return this;
        }

        private String operatorName(String suffix) {
            return this.uidPrefix != null ? this.uidPrefix + "-" + suffix : suffix;
        }

        private DynamicIcebergSink build() {
            Preconditions.checkArgument(this.generator != null, "Please use withGenerator() to convert the input DataStream.");
            Preconditions.checkNotNull(this.catalogLoader, "Catalog loader shouldn't be null");
            FlinkWriteConf flinkWriteConf = new FlinkWriteConf(this.writeOptions, this.readableConfig);
            Map<String, String> writeProperties = SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, null);
            this.uidPrefix = Optional.ofNullable(this.uidPrefix).orElse("");
            return this.instantiateSink(writeProperties, flinkWriteConf);
        }

        @VisibleForTesting
        DynamicIcebergSink instantiateSink(Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf) {
            return new DynamicIcebergSink(this.catalogLoader, this.snapshotSummary, this.uidPrefix, writeProperties, flinkWriteConf, this.cacheMaximumSize);
        }

        public DataStreamSink<DynamicRecordInternal> append() {
            DynamicRecordInternalType type = new DynamicRecordInternalType(this.catalogLoader, false, this.cacheMaximumSize);
            DynamicIcebergSink sink = this.build();
            SingleOutputStreamOperator converted = this.input.process(new DynamicRecordProcessor<T>(this.generator, this.catalogLoader, this.immediateUpdate, this.cacheMaximumSize, this.cacheRefreshMs, this.inputSchemasPerTableCacheMaximumSize)).uid(DynamicIcebergSink.prefixIfNotNull(this.uidPrefix, "-generator")).name(this.operatorName("generator")).returns((TypeInformation)type);
            DataStreamSink rowDataDataStreamSink = converted.getSideOutput(new OutputTag("dynamic-table-update-stream", (TypeInformation)new DynamicRecordInternalType(this.catalogLoader, true, this.cacheMaximumSize))).keyBy(DynamicRecordInternal::tableName).map((MapFunction)new DynamicTableUpdateOperator(this.catalogLoader, this.cacheMaximumSize, this.cacheRefreshMs, this.inputSchemasPerTableCacheMaximumSize)).uid(DynamicIcebergSink.prefixIfNotNull(this.uidPrefix, "-updater")).name(this.operatorName("Updater")).returns((TypeInformation)type).union(new DataStream[]{converted}).sinkTo((Sink)sink).uid(DynamicIcebergSink.prefixIfNotNull(this.uidPrefix, "-sink"));
            if (sink.flinkWriteConf.writeParallelism() != null) {
                rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism().intValue());
            }
            return rowDataDataStreamSink;
        }
    }
}

