/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.AppendOnlyMultiTableCompactionWorkerOperator;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableChannelComputer;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.MultiTablesStoreCompactOperator;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.flink.utils.ManagedMemoryUtils;
import org.apache.paimon.flink.utils.ParallelismUtils;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;

public class CombinedTableCompactorSink
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final String WRITER_NAME = "Writer";
    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
    private final CatalogLoader catalogLoader;
    private final boolean ignorePreviousFiles;
    private final boolean fullCompaction;
    private final Options options;

    public CombinedTableCompactorSink(CatalogLoader catalogLoader, Options options, boolean fullCompaction) {
        this.catalogLoader = catalogLoader;
        this.ignorePreviousFiles = false;
        this.fullCompaction = fullCompaction;
        this.options = options;
    }

    public DataStreamSink<?> sinkFrom(DataStream<RowData> awareBucketTableSource, DataStream<MultiTableUnawareAppendCompactionTask> unawareBucketTableSource) {
        return this.sinkFrom(awareBucketTableSource, unawareBucketTableSource, CoreOptions.createCommitUser(this.options));
    }

    public DataStreamSink<?> sinkFrom(DataStream<RowData> awareBucketTableSource, DataStream<MultiTableUnawareAppendCompactionTask> unawareBucketTableSource, String initialCommitUser) {
        DataStream<MultiTableCommittable> written = this.doWrite(awareBucketTableSource, unawareBucketTableSource, initialCommitUser);
        return this.doCommit(written, initialCommitUser);
    }

    public DataStream<MultiTableCommittable> doWrite(DataStream<RowData> awareBucketTableSource, DataStream<MultiTableUnawareAppendCompactionTask> unawareBucketTableSource, String commitUser) {
        StreamExecutionEnvironment env = awareBucketTableSource.getExecutionEnvironment();
        boolean isStreaming = env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        SingleOutputStreamOperator multiBucketTableRewriter = awareBucketTableSource.transform(String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME), (TypeInformation)new MultiTableCommittableTypeInfo(), this.combinedMultiCompactionWriteOperator(env.getCheckpointConfig(), isStreaming, this.fullCompaction, commitUser));
        ParallelismUtils.forwardParallelism(multiBucketTableRewriter, awareBucketTableSource);
        SingleOutputStreamOperator unawareBucketTableRewriter = unawareBucketTableSource.transform(String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME), (TypeInformation)new MultiTableCommittableTypeInfo(), (OneInputStreamOperatorFactory)new AppendOnlyMultiTableCompactionWorkerOperator.Factory(this.catalogLoader, commitUser, this.options));
        ParallelismUtils.forwardParallelism(unawareBucketTableRewriter, unawareBucketTableSource);
        if (!isStreaming) {
            FlinkSink.assertBatchAdaptiveParallelism(env, multiBucketTableRewriter.getParallelism());
            FlinkSink.assertBatchAdaptiveParallelism(env, unawareBucketTableRewriter.getParallelism());
        }
        if (this.options.get(FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY).booleanValue()) {
            ManagedMemoryUtils.declareManagedMemory(multiBucketTableRewriter, this.options.get(FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY));
            ManagedMemoryUtils.declareManagedMemory(unawareBucketTableRewriter, this.options.get(FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY));
        }
        return multiBucketTableRewriter.union(new DataStream[]{unawareBucketTableRewriter});
    }

    protected DataStreamSink<?> doCommit(DataStream<MultiTableCommittable> written, String commitUser) {
        boolean streamingCheckpointEnabled;
        StreamExecutionEnvironment env = written.getExecutionEnvironment();
        ReadableConfig conf = env.getConfiguration();
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        boolean bl = streamingCheckpointEnabled = isStreaming && checkpointConfig.isCheckpointingEnabled();
        if (streamingCheckpointEnabled) {
            FlinkSink.assertStreamingConfiguration(env);
        }
        DataStream<MultiTableCommittable> partitioned = FlinkStreamPartitioner.partition(written, new MultiTableCommittableChannelComputer(), written.getParallelism());
        SingleOutputStreamOperator committed = partitioned.transform(GLOBAL_COMMITTER_NAME, (TypeInformation)new MultiTableCommittableTypeInfo(), new CommitterOperatorFactory<MultiTableCommittable, WrappedManifestCommittable>(streamingCheckpointEnabled, false, commitUser, this.createCommitterFactory(isStreaming), this.createCommittableStateManager(), this.options.get(FlinkConnectorOptions.END_INPUT_WATERMARK)));
        ParallelismUtils.forwardParallelism(committed, written);
        if (!this.options.get(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING).booleanValue()) {
            committed = committed.startNewChain();
        }
        return committed.sinkTo((Sink)new DiscardingSink()).name("end").setParallelism(1);
    }

    protected OneInputStreamOperatorFactory<RowData, MultiTableCommittable> combinedMultiCompactionWriteOperator(CheckpointConfig checkpointConfig, boolean isStreaming, boolean fullCompaction, String commitUser) {
        return new MultiTablesStoreCompactOperator.Factory(this.catalogLoader, commitUser, checkpointConfig, isStreaming, this.ignorePreviousFiles, fullCompaction, this.options);
    }

    protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable> createCommitterFactory(boolean isStreaming) {
        Map<String, String> dynamicOptions = this.options.toMap();
        dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
        if (isStreaming) {
            dynamicOptions.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
            dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
            dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false");
        }
        return context -> new StoreMultiCommitter(this.catalogLoader, context, true, dynamicOptions);
    }

    protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
        return new RestoreAndFailCommittableStateManager<WrappedManifestCommittable>(WrappedManifestCommittableSerializer::new);
    }
}

