/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.cdc.CaseSensitiveUtils;
import org.apache.paimon.flink.sink.cdc.CdcDynamicBucketSink;
import org.apache.paimon.flink.sink.cdc.CdcFixedBucketSink;
import org.apache.paimon.flink.sink.cdc.CdcParsingProcessFunction;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordChannelComputer;
import org.apache.paimon.flink.sink.cdc.CdcUnawareBucketSink;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;

@Experimental
public class CdcSinkBuilder<T> {
    private DataStream<T> input = null;
    private EventParser.Factory<T> parserFactory = null;
    private Table table = null;
    private Identifier identifier = null;
    private CatalogLoader catalogLoader = null;
    @Nullable
    private Integer parallelism;

    public CdcSinkBuilder<T> withInput(DataStream<T> input) {
        this.input = input;
        return this;
    }

    public CdcSinkBuilder<T> withParserFactory(EventParser.Factory<T> parserFactory) {
        this.parserFactory = parserFactory;
        return this;
    }

    public CdcSinkBuilder<T> withTable(Table table) {
        this.table = table;
        return this;
    }

    public CdcSinkBuilder<T> withParallelism(@Nullable Integer parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    public CdcSinkBuilder<T> withIdentifier(Identifier identifier) {
        this.identifier = identifier;
        return this;
    }

    public CdcSinkBuilder<T> withCatalogLoader(CatalogLoader catalogLoader) {
        this.catalogLoader = catalogLoader;
        return this;
    }

    public DataStreamSink<?> build() {
        Preconditions.checkNotNull(this.input, "Input DataStream can not be null.");
        Preconditions.checkNotNull(this.parserFactory, "Event ParserFactory can not be null.");
        Preconditions.checkNotNull(this.table, "Paimon Table can not be null.");
        Preconditions.checkNotNull(this.identifier, "Paimon Table Identifier can not be null.");
        Preconditions.checkNotNull(this.catalogLoader, "Paimon Catalog Loader can not be null.");
        if (!(this.table instanceof FileStoreTable)) {
            throw new IllegalArgumentException("Table should be a data table, but is: " + this.table.getClass().getName());
        }
        FileStoreTable dataTable = (FileStoreTable)this.table;
        SingleOutputStreamOperator parsed = this.input.forward().process(new CdcParsingProcessFunction<T>(this.parserFactory)).name("Side Output").setParallelism(this.input.getParallelism());
        SingleOutputStreamOperator schemaChangeProcessFunction = SingleOutputStreamOperatorUtils.getSideOutput(parsed, CdcParsingProcessFunction.NEW_DATA_FIELD_LIST_OUTPUT_TAG).process((ProcessFunction)new UpdatedDataFieldsProcessFunction(new SchemaManager(dataTable.fileIO(), dataTable.location()), this.identifier, this.catalogLoader)).name("Schema Evolution");
        schemaChangeProcessFunction.getTransformation().setParallelism(1);
        schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
        DataStream<CdcRecord> converted = CaseSensitiveUtils.cdcRecordConvert(this.catalogLoader, (DataStream<CdcRecord>)parsed);
        BucketMode bucketMode = dataTable.bucketMode();
        switch (bucketMode) {
            case HASH_FIXED: {
                return this.buildForFixedBucket(converted);
            }
            case HASH_DYNAMIC: {
                return new CdcDynamicBucketSink((FileStoreTable)this.table).build(converted, this.parallelism);
            }
            case BUCKET_UNAWARE: {
                return this.buildForUnawareBucket(converted);
            }
        }
        throw new UnsupportedOperationException("Unsupported bucket mode: " + (Object)((Object)bucketMode));
    }

    private DataStreamSink<?> buildForFixedBucket(DataStream<CdcRecord> parsed) {
        FileStoreTable dataTable = (FileStoreTable)this.table;
        DataStream<CdcRecord> partitioned = FlinkStreamPartitioner.partition(parsed, new CdcRecordChannelComputer(dataTable.schema()), this.parallelism);
        return new CdcFixedBucketSink(dataTable).sinkFrom(partitioned);
    }

    private DataStreamSink<?> buildForUnawareBucket(DataStream<CdcRecord> parsed) {
        FileStoreTable dataTable = (FileStoreTable)this.table;
        return new CdcUnawareBucketSink(dataTable, this.parallelism).sinkFrom(parsed.rebalance());
    }
}

