/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreMultiWriteOperator;
import org.apache.paimon.flink.sink.cdc.MultiTableCommittableChannelComputer;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;

public class FlinkCdcMultiTableSink
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final String WRITER_NAME = "CDC MultiplexWriter";
    private static final String GLOBAL_COMMITTER_NAME = "Multiplex Global Committer";
    private final boolean isOverwrite = false;
    private final Catalog.Loader catalogLoader;
    private final double commitCpuCores;
    @Nullable
    private final MemorySize commitHeapMemory;

    public FlinkCdcMultiTableSink(Catalog.Loader catalogLoader, double commitCpuCores, @Nullable MemorySize commitHeapMemory) {
        this.catalogLoader = catalogLoader;
        this.commitCpuCores = commitCpuCores;
        this.commitHeapMemory = commitHeapMemory;
    }

    private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
        return (table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new StoreSinkWriteImpl(table, commitUser, state, ioManager, false, FlinkConnectorOptions.prepareCommitWaitCompaction(table.coreOptions().toConfiguration()), true, memoryPoolFactory, metricGroup);
    }

    public DataStreamSink<?> sinkFrom(DataStream<CdcMultiplexRecord> input) {
        String initialCommitUser = UUID.randomUUID().toString();
        return this.sinkFrom(input, initialCommitUser, this.createWriteProvider());
    }

    public DataStreamSink<?> sinkFrom(DataStream<CdcMultiplexRecord> input, String commitUser, StoreSinkWrite.WithWriteBufferProvider sinkProvider) {
        StreamExecutionEnvironment env = input.getExecutionEnvironment();
        FlinkSink.assertStreamingConfiguration(env);
        MultiTableCommittableTypeInfo typeInfo = new MultiTableCommittableTypeInfo();
        SingleOutputStreamOperator written = input.transform(WRITER_NAME, (TypeInformation)typeInfo, this.createWriteOperator(sinkProvider, commitUser)).setParallelism(input.getParallelism());
        DataStream<MultiTableCommittable> partitioned = FlinkStreamPartitioner.partition(written, new MultiTableCommittableChannelComputer(), input.getParallelism());
        SingleOutputStreamOperator committed = partitioned.transform(GLOBAL_COMMITTER_NAME, (TypeInformation)typeInfo, new CommitterOperator<MultiTableCommittable, WrappedManifestCommittable>(true, false, commitUser, this.createCommitterFactory(), this.createCommittableStateManager())).setParallelism(input.getParallelism());
        FlinkSink.configureGlobalCommitter(committed, this.commitCpuCores, this.commitHeapMemory);
        return committed.addSink((SinkFunction)new DiscardingSink()).name("end").setParallelism(1);
    }

    protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator(StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
        return new CdcRecordStoreMultiWriteOperator(this.catalogLoader, writeProvider, commitUser, new Options());
    }

    protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable> createCommitterFactory() {
        return (user, metricGroup) -> new StoreMultiCommitter(this.catalogLoader, user, metricGroup);
    }

    protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
        return new RestoreAndFailCommittableStateManager<WrappedManifestCommittable>(() -> new VersionedSerializerWrapper<WrappedManifestCommittable>(new WrappedManifestCommittableSerializer()));
    }
}

