/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.flink.sink.FileStoreSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.LocalMergeOperator;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.sink.RowDynamicBucketSink;
import org.apache.paimon.flink.sink.UnawareBucketWriteSink;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

public class FlinkSinkBuilder {
    private final FileStoreTable table;
    private DataStream<RowData> input;
    @Nullable
    private Map<String, String> overwritePartition;
    @Nullable
    private LogSinkFunction logSinkFunction;
    @Nullable
    private Integer parallelism;

    public FlinkSinkBuilder(FileStoreTable table) {
        this.table = table;
    }

    public FlinkSinkBuilder withInput(DataStream<RowData> input) {
        this.input = input;
        return this;
    }

    public FlinkSinkBuilder withOverwritePartition(Map<String, String> overwritePartition) {
        this.overwritePartition = overwritePartition;
        return this;
    }

    public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) {
        this.logSinkFunction = logSinkFunction;
        return this;
    }

    public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    public DataStreamSink<?> build() {
        SingleOutputStreamOperator input = this.input;
        if (this.table.coreOptions().localMergeEnabled() && this.table.schema().primaryKeys().size() > 0) {
            input = input.forward().transform("local merge", input.getType(), (OneInputStreamOperator)new LocalMergeOperator(this.table.schema())).setParallelism(input.getParallelism());
        }
        BucketMode bucketMode = this.table.bucketMode();
        switch (bucketMode) {
            case FIXED: {
                return this.buildForFixedBucket((DataStream<RowData>)input);
            }
            case DYNAMIC: {
                return this.buildDynamicBucketSink((DataStream<RowData>)input, false);
            }
            case GLOBAL_DYNAMIC: {
                return this.buildDynamicBucketSink((DataStream<RowData>)input, true);
            }
            case UNAWARE: {
                return this.buildUnawareBucketSink((DataStream<RowData>)input);
            }
        }
        throw new UnsupportedOperationException("Unsupported bucket mode: " + (Object)((Object)bucketMode));
    }

    private DataStreamSink<?> buildDynamicBucketSink(DataStream<RowData> input, boolean globalIndex) {
        Preconditions.checkArgument(this.logSinkFunction == null, "Dynamic bucket mode can not work with log system.");
        return globalIndex ? new GlobalDynamicBucketSink(this.table, this.overwritePartition).build(input, this.parallelism) : new RowDynamicBucketSink(this.table, this.overwritePartition).build(input, this.parallelism);
    }

    private DataStreamSink<?> buildForFixedBucket(DataStream<RowData> input) {
        DataStream<RowData> partitioned = FlinkStreamPartitioner.partition(input, new RowDataChannelComputer(this.table.schema(), this.logSinkFunction != null), this.parallelism);
        FileStoreSink sink = new FileStoreSink(this.table, this.overwritePartition, this.logSinkFunction);
        return sink.sinkFrom(partitioned);
    }

    private DataStreamSink<?> buildUnawareBucketSink(DataStream<RowData> input) {
        Preconditions.checkArgument(this.table instanceof AppendOnlyFileStoreTable, "Unaware bucket mode only works with append-only table for now.");
        return new UnawareBucketWriteSink((AppendOnlyFileStoreTable)this.table, this.overwritePartition, this.logSinkFunction, this.parallelism).sinkFrom(input);
    }
}

