/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableChannelComputer;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreMultiWriteOperator;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;

public class FlinkCdcMultiTableSink
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final String WRITER_NAME = "CDC MultiplexWriter";
    private static final String GLOBAL_COMMITTER_NAME = "Multiplex Global Committer";
    private final boolean isOverwrite = false;
    private final CatalogLoader catalogLoader;
    private final double commitCpuCores;
    @Nullable
    private final MemorySize commitHeapMemory;
    private final String commitUser;

    public FlinkCdcMultiTableSink(CatalogLoader catalogLoader, double commitCpuCores, @Nullable MemorySize commitHeapMemory, String commitUser) {
        this.catalogLoader = catalogLoader;
        this.commitCpuCores = commitCpuCores;
        this.commitHeapMemory = commitHeapMemory;
        this.commitUser = commitUser;
    }

    private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
        return (table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new StoreSinkWriteImpl(table, commitUser, state, ioManager, false, table.coreOptions().prepareCommitWaitCompaction(), true, memoryPoolFactory, metricGroup);
    }

    public DataStreamSink<?> sinkFrom(DataStream<CdcMultiplexRecord> input) {
        return this.sinkFrom(input, this.commitUser, this.createWriteProvider());
    }

    public DataStreamSink<?> sinkFrom(DataStream<CdcMultiplexRecord> input, String commitUser, StoreSinkWrite.WithWriteBufferProvider sinkProvider) {
        StreamExecutionEnvironment env = input.getExecutionEnvironment();
        FlinkSink.assertStreamingConfiguration(env);
        MultiTableCommittableTypeInfo typeInfo = new MultiTableCommittableTypeInfo();
        SingleOutputStreamOperator written = input.transform(WRITER_NAME, (TypeInformation)typeInfo, this.createWriteOperator(sinkProvider, commitUser)).setParallelism(input.getParallelism());
        DataStream<MultiTableCommittable> partitioned = FlinkStreamPartitioner.partition(written, new MultiTableCommittableChannelComputer(), input.getParallelism());
        SingleOutputStreamOperator committed = partitioned.transform(GLOBAL_COMMITTER_NAME, (TypeInformation)typeInfo, new CommitterOperatorFactory<MultiTableCommittable, WrappedManifestCommittable>(true, false, commitUser, this.createCommitterFactory(), this.createCommittableStateManager())).setParallelism(input.getParallelism());
        FlinkSink.configureGlobalCommitter(committed, this.commitCpuCores, this.commitHeapMemory);
        return committed.sinkTo((Sink)new DiscardingSink()).name("end").setParallelism(1);
    }

    protected OneInputStreamOperatorFactory<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator(StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
        return new CdcRecordStoreMultiWriteOperator.Factory(this.catalogLoader, writeProvider, commitUser, new Options());
    }

    protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable> createCommitterFactory() {
        return context -> new StoreMultiCommitter(this.catalogLoader, context);
    }

    protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
        return new RestoreAndFailCommittableStateManager<WrappedManifestCommittable>(WrappedManifestCommittableSerializer::new);
    }
}

