/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.cdc.CdcDynamicBucketSink;
import org.apache.paimon.flink.sink.cdc.CdcDynamicTableParsingProcessFunction;
import org.apache.paimon.flink.sink.cdc.CdcFixedBucketSink;
import org.apache.paimon.flink.sink.cdc.CdcMultiTableParsingProcessFunction;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecordChannelComputer;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordChannelComputer;
import org.apache.paimon.flink.sink.cdc.CdcUnawareBucketSink;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcMultiTableSink;
import org.apache.paimon.flink.sink.cdc.MultiTableUpdatedDataFieldsProcessFunction;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

public class FlinkCdcSyncDatabaseSinkBuilder<T> {
    private DataStream<T> input = null;
    private EventParser.Factory<T> parserFactory = null;
    private List<FileStoreTable> tables = new ArrayList<FileStoreTable>();
    @Nullable
    private Integer parallelism;
    private double committerCpu;
    @Nullable
    private MemorySize committerMemory;
    private Catalog.Loader catalogLoader;
    private String database;
    private MultiTablesSinkMode mode;

    public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> input) {
        this.input = input;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withParserFactory(EventParser.Factory<T> parserFactory) {
        this.parserFactory = parserFactory;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withTables(List<FileStoreTable> tables) {
        this.tables = tables;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Map<String, String> options) {
        return this.withTableOptions(Options.fromMap(options));
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options options) {
        this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
        this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
        this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withDatabase(String database) {
        this.database = database;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(Catalog.Loader catalogLoader) {
        this.catalogLoader = catalogLoader;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MultiTablesSinkMode mode) {
        this.mode = mode;
        return this;
    }

    public void build() {
        Preconditions.checkNotNull(this.input);
        Preconditions.checkNotNull(this.parserFactory);
        Preconditions.checkNotNull(this.database);
        Preconditions.checkNotNull(this.catalogLoader);
        if (this.mode == MultiTablesSinkMode.COMBINED) {
            this.buildCombinedCdcSink();
        } else {
            this.buildDividedCdcSink();
        }
    }

    private void buildCombinedCdcSink() {
        SingleOutputStreamOperator parsed = this.input.forward().process(new CdcDynamicTableParsingProcessFunction<T>(this.database, this.catalogLoader, this.parserFactory)).name("Side Output").setParallelism(this.input.getParallelism());
        DataStream<CdcMultiplexRecord> newlyAddedTableStream = SingleOutputStreamOperatorUtils.getSideOutput(parsed, CdcDynamicTableParsingProcessFunction.DYNAMIC_OUTPUT_TAG);
        SingleOutputStreamOperatorUtils.getSideOutput(parsed, CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG).process((ProcessFunction)new MultiTableUpdatedDataFieldsProcessFunction(this.catalogLoader)).name("Schema Evolution");
        DataStream<CdcMultiplexRecord> partitioned = FlinkStreamPartitioner.partition(newlyAddedTableStream, new CdcMultiplexRecordChannelComputer(this.catalogLoader), this.parallelism);
        FlinkCdcMultiTableSink sink = new FlinkCdcMultiTableSink(this.catalogLoader, this.committerCpu, this.committerMemory);
        sink.sinkFrom(partitioned);
    }

    private void buildForFixedBucket(FileStoreTable table, DataStream<CdcRecord> parsed) {
        DataStream<CdcRecord> partitioned = FlinkStreamPartitioner.partition(parsed, new CdcRecordChannelComputer(table.schema()), this.parallelism);
        new CdcFixedBucketSink(table).sinkFrom(partitioned);
    }

    private void buildForUnawareBucket(FileStoreTable table, DataStream<CdcRecord> parsed) {
        new CdcUnawareBucketSink(table, this.parallelism).sinkFrom(parsed);
    }

    private void buildDividedCdcSink() {
        Preconditions.checkNotNull(this.tables);
        SingleOutputStreamOperator parsed = this.input.forward().process(new CdcMultiTableParsingProcessFunction<T>(this.parserFactory)).setParallelism(this.input.getParallelism());
        block5: for (FileStoreTable table : this.tables) {
            SingleOutputStreamOperator schemaChangeProcessFunction = SingleOutputStreamOperatorUtils.getSideOutput(parsed, CdcMultiTableParsingProcessFunction.createUpdatedDataFieldsOutputTag(table.name())).process((ProcessFunction)new UpdatedDataFieldsProcessFunction(new SchemaManager(table.fileIO(), table.location()), Identifier.create(this.database, table.name()), this.catalogLoader));
            schemaChangeProcessFunction.getTransformation().setParallelism(1);
            schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
            DataStream<CdcRecord> parsedForTable = SingleOutputStreamOperatorUtils.getSideOutput(parsed, CdcMultiTableParsingProcessFunction.createRecordOutputTag(table.name()));
            BucketMode bucketMode = table.bucketMode();
            switch (bucketMode) {
                case FIXED: {
                    this.buildForFixedBucket(table, parsedForTable);
                    continue block5;
                }
                case DYNAMIC: {
                    new CdcDynamicBucketSink(table).build(parsedForTable, this.parallelism);
                    continue block5;
                }
                case UNAWARE: {
                    this.buildForUnawareBucket(table, parsedForTable);
                    continue block5;
                }
            }
            throw new UnsupportedOperationException("Unsupported bucket mode: " + (Object)((Object)bucketMode));
        }
    }
}

