/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.UUID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.AdaptiveParallelism;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.MultiTablesStoreCompactOperator;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.flink.utils.ManagedMemoryUtils;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.Preconditions;

public class MultiTablesCompactorSink
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final String WRITER_NAME = "Writer";
    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
    private final Catalog.Loader catalogLoader;
    private final boolean ignorePreviousFiles;
    private final Options options;

    public MultiTablesCompactorSink(Catalog.Loader catalogLoader, Options options) {
        this.catalogLoader = catalogLoader;
        this.ignorePreviousFiles = false;
        this.options = options;
    }

    public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
        String initialCommitUser = UUID.randomUUID().toString();
        return this.sinkFrom(input, initialCommitUser);
    }

    public DataStreamSink<?> sinkFrom(DataStream<RowData> input, String initialCommitUser) {
        SingleOutputStreamOperator<MultiTableCommittable> written = this.doWrite(input, initialCommitUser, input.getParallelism());
        return this.doCommit((DataStream<MultiTableCommittable>)written, initialCommitUser);
    }

    public SingleOutputStreamOperator<MultiTableCommittable> doWrite(DataStream<RowData> input, String commitUser, Integer parallelism) {
        StreamExecutionEnvironment env = input.getExecutionEnvironment();
        boolean isStreaming = StreamExecutionEnvironmentUtils.getConfiguration(env).get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        SingleOutputStreamOperator written = input.transform(WRITER_NAME, (TypeInformation)new MultiTableCommittableTypeInfo(), this.createWriteOperator(env.getCheckpointConfig(), isStreaming, commitUser)).setParallelism(parallelism == null ? input.getParallelism() : parallelism.intValue());
        if (!isStreaming) {
            this.assertBatchConfiguration(env, written.getParallelism());
        }
        if (this.options.get(FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY).booleanValue()) {
            ManagedMemoryUtils.declareManagedMemory(written, this.options.get(FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY));
        }
        return written;
    }

    protected DataStreamSink<?> doCommit(DataStream<MultiTableCommittable> written, String commitUser) {
        boolean streamingCheckpointEnabled;
        StreamExecutionEnvironment env = written.getExecutionEnvironment();
        ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        boolean bl = streamingCheckpointEnabled = isStreaming && checkpointConfig.isCheckpointingEnabled();
        if (streamingCheckpointEnabled) {
            MultiTablesCompactorSink.assertStreamingConfiguration(env);
        }
        SingleOutputStreamOperator committed = written.transform(GLOBAL_COMMITTER_NAME, (TypeInformation)new MultiTableCommittableTypeInfo(), new CommitterOperator<MultiTableCommittable, WrappedManifestCommittable>(streamingCheckpointEnabled, commitUser, this.createCommitterFactory(), this.createCommittableStateManager())).setParallelism(1).setMaxParallelism(1);
        return committed.addSink((SinkFunction)new DiscardingSink()).name("end").setParallelism(1);
    }

    public static void assertStreamingConfiguration(StreamExecutionEnvironment env) {
        Preconditions.checkArgument(!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "Paimon sink currently does not support unaligned checkpoints. Please set " + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() + " to false.");
        Preconditions.checkArgument(env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, "Paimon sink currently only supports EXACTLY_ONCE checkpoint mode. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() + " to exactly-once");
    }

    private void assertBatchConfiguration(StreamExecutionEnvironment env, int sinkParallelism) {
        try {
            Preconditions.checkArgument(sinkParallelism != -1 || !AdaptiveParallelism.isEnabled(env), "Paimon Sink does not support Flink's Adaptive Parallelism mode. Please manually turn it off or set Paimon `sink.parallelism` manually.");
        }
        catch (NoClassDefFoundError noClassDefFoundError) {
            // empty catch block
        }
    }

    protected OneInputStreamOperator<RowData, MultiTableCommittable> createWriteOperator(CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) {
        return new MultiTablesStoreCompactOperator(this.catalogLoader, commitUser, checkpointConfig, isStreaming, this.ignorePreviousFiles, this.options);
    }

    protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable> createCommitterFactory() {
        return (user, metricGroup) -> new StoreMultiCommitter(this.catalogLoader, user, metricGroup, true);
    }

    protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
        return new RestoreAndFailCommittableStateManager<WrappedManifestCommittable>(() -> new VersionedSerializerWrapper<WrappedManifestCommittable>(new WrappedManifestCommittableSerializer()));
    }
}

