/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.DynamicBucketCompactSink;
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.LocalMergeOperator;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.sink.MapToInternalRow;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.sink.RowDynamicBucketSink;
import org.apache.paimon.flink.sink.RowUnawareBucketSink;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

public class FlinkSinkBuilder {
    private final FileStoreTable table;
    private DataStream<RowData> input;
    @Nullable
    private Map<String, String> overwritePartition;
    @Nullable
    private LogSinkFunction logSinkFunction;
    @Nullable
    private Integer parallelism;
    private boolean boundedInput = false;
    private boolean compactSink = false;

    public FlinkSinkBuilder(FileStoreTable table) {
        this.table = table;
    }

    public FlinkSinkBuilder withInput(DataStream<RowData> input) {
        this.input = input;
        return this;
    }

    public FlinkSinkBuilder withOverwritePartition(@Nullable Map<String, String> overwritePartition) {
        this.overwritePartition = overwritePartition;
        return this;
    }

    public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) {
        this.logSinkFunction = logSinkFunction;
        return this;
    }

    public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    public FlinkSinkBuilder withBoundedInputStream(boolean bounded) {
        this.boundedInput = bounded;
        return this;
    }

    public FlinkSinkBuilder forCompact(boolean compactSink) {
        this.compactSink = compactSink;
        return this;
    }

    public DataStreamSink<?> build() {
        SingleOutputStreamOperator input = MapToInternalRow.map(this.input, this.table.rowType());
        if (this.table.coreOptions().localMergeEnabled() && this.table.schema().primaryKeys().size() > 0) {
            input = input.forward().transform("local merge", input.getType(), (OneInputStreamOperator)new LocalMergeOperator(this.table.schema())).setParallelism(input.getParallelism());
        }
        BucketMode bucketMode = this.table.bucketMode();
        switch (bucketMode) {
            case FIXED: {
                return this.buildForFixedBucket((DataStream<InternalRow>)input);
            }
            case DYNAMIC: {
                return this.buildDynamicBucketSink((DataStream<InternalRow>)input, false);
            }
            case GLOBAL_DYNAMIC: {
                return this.buildDynamicBucketSink((DataStream<InternalRow>)input, true);
            }
            case UNAWARE: {
                return this.buildUnawareBucketSink((DataStream<InternalRow>)input);
            }
        }
        throw new UnsupportedOperationException("Unsupported bucket mode: " + (Object)((Object)bucketMode));
    }

    private DataStreamSink<?> buildDynamicBucketSink(DataStream<InternalRow> input, boolean globalIndex) {
        Preconditions.checkArgument(this.logSinkFunction == null, "Dynamic bucket mode can not work with log system.");
        return this.compactSink && !globalIndex ? new DynamicBucketCompactSink(this.table, this.overwritePartition).build(input, this.parallelism) : (globalIndex ? new GlobalDynamicBucketSink(this.table, this.overwritePartition).build(input, this.parallelism) : new RowDynamicBucketSink(this.table, this.overwritePartition).build(input, this.parallelism));
    }

    private DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
        DataStream<InternalRow> partitioned = FlinkStreamPartitioner.partition(input, new RowDataChannelComputer(this.table.schema(), this.logSinkFunction != null), this.parallelism);
        FixedBucketSink sink = new FixedBucketSink(this.table, this.overwritePartition, this.logSinkFunction);
        return sink.sinkFrom(partitioned);
    }

    private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> input) {
        Preconditions.checkArgument(this.table.primaryKeys().isEmpty(), "Unaware bucket mode only works with append-only table for now.");
        return new RowUnawareBucketSink(this.table, this.overwritePartition, this.logSinkFunction, this.parallelism, this.boundedInput).sinkFrom(input);
    }
}

