/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.DynamicBucketCompactSink;
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.LocalMergeOperator;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.sink.RowDynamicBucketSink;
import org.apache.paimon.flink.sink.RowUnawareBucketSink;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
public class FlinkSinkBuilder {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSinkBuilder.class);
    protected final FileStoreTable table;
    private DataStream<RowData> input;
    @Nullable
    protected Map<String, String> overwritePartition;
    @Nullable
    protected Integer parallelism;
    @Nullable
    private TableSortInfo tableSortInfo;
    protected boolean compactSink = false;
    @Nullable
    protected LogSinkFunction logSinkFunction;

    public FlinkSinkBuilder(Table table) {
        if (!(table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException("Unsupported table type: " + table);
        }
        this.table = (FileStoreTable)table;
    }

    public FlinkSinkBuilder forRow(DataStream<Row> input, DataType rowDataType) {
        org.apache.flink.table.types.logical.RowType rowType = (org.apache.flink.table.types.logical.RowType)rowDataType.getLogicalType();
        DataType[] fieldDataTypes = rowDataType.getChildren().toArray(new DataType[0]);
        DataFormatConverters.RowConverter converter = new DataFormatConverters.RowConverter(fieldDataTypes);
        this.input = input.map(arg_0 -> ((DataFormatConverters.RowConverter)converter).toInternal(arg_0)).setParallelism(input.getParallelism()).returns((TypeInformation)org.apache.flink.table.runtime.typeutils.InternalTypeInfo.of((org.apache.flink.table.types.logical.RowType)rowType));
        return this;
    }

    public FlinkSinkBuilder forRowData(DataStream<RowData> input) {
        this.input = input;
        return this;
    }

    public FlinkSinkBuilder overwrite() {
        return this.overwrite(new HashMap<String, String>());
    }

    public FlinkSinkBuilder overwrite(Map<String, String> overwritePartition) {
        this.overwritePartition = overwritePartition;
        return this;
    }

    public FlinkSinkBuilder parallelism(int parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    public FlinkSinkBuilder clusteringIfPossible(String clusteringColumns, String clusteringStrategy, boolean sortInCluster, int sampleFactor) {
        if (clusteringColumns == null || clusteringColumns.isEmpty()) {
            return this;
        }
        Preconditions.checkState(this.input != null, "The input stream should be specified earlier.");
        if (FlinkSink.isStreaming(this.input) || !this.table.bucketMode().equals((Object)BucketMode.BUCKET_UNAWARE)) {
            LOG.warn("Clustering is enabled; however, it has been skipped as it only supports the bucket unaware table without primary keys and BATCH execution mode.");
            return this;
        }
        List<String> columns = Arrays.asList(clusteringColumns.split(","));
        List<String> fieldNames = this.table.schema().fieldNames();
        Preconditions.checkState(new HashSet<String>(fieldNames).containsAll(new HashSet<String>(columns)), String.format("Field names %s should contains all clustering column names %s.", fieldNames, columns));
        Preconditions.checkState(sampleFactor >= 20, "The minimum allowed " + FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR.key() + " is " + 20 + ".");
        TableSortInfo.Builder sortInfoBuilder = new TableSortInfo.Builder();
        if (clusteringStrategy.equals(FlinkConnectorOptions.CLUSTERING_STRATEGY.defaultValue())) {
            if (columns.size() == 1) {
                sortInfoBuilder.setSortStrategy(TableSorter.OrderType.ORDER);
            } else if (columns.size() < 5) {
                sortInfoBuilder.setSortStrategy(TableSorter.OrderType.ZORDER);
            } else {
                sortInfoBuilder.setSortStrategy(TableSorter.OrderType.HILBERT);
            }
        } else {
            sortInfoBuilder.setSortStrategy(TableSorter.OrderType.of(clusteringStrategy));
        }
        int upstreamParallelism = this.input.getParallelism();
        String sinkParallelismValue = this.table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        int sinkParallelism = sinkParallelismValue == null ? upstreamParallelism : Integer.parseInt(sinkParallelismValue);
        sortInfoBuilder.setSortColumns(columns).setSortInCluster(sortInCluster).setSinkParallelism(sinkParallelism);
        int globalSampleSize = sinkParallelism * sampleFactor;
        int localSampleSize = upstreamParallelism > 0 ? Math.max(sampleFactor, globalSampleSize / upstreamParallelism) : sinkParallelism * 20;
        this.tableSortInfo = sortInfoBuilder.setRangeNumber(sinkParallelism).setGlobalSampleSize(globalSampleSize).setLocalSampleSize(localSampleSize).build();
        return this;
    }

    public DataStreamSink<?> build() {
        this.input = this.trySortInput(this.input);
        SingleOutputStreamOperator input = this.mapToInternalRow(this.input, this.table.rowType());
        if (this.table.coreOptions().localMergeEnabled() && this.table.schema().primaryKeys().size() > 0) {
            input = input.forward().transform("local merge", input.getType(), (OneInputStreamOperator)new LocalMergeOperator(this.table.schema())).setParallelism(input.getParallelism());
        }
        BucketMode bucketMode = this.table.bucketMode();
        switch (bucketMode) {
            case HASH_FIXED: {
                return this.buildForFixedBucket((DataStream<InternalRow>)input);
            }
            case HASH_DYNAMIC: {
                return this.buildDynamicBucketSink((DataStream<InternalRow>)input, false);
            }
            case CROSS_PARTITION: {
                return this.buildDynamicBucketSink((DataStream<InternalRow>)input, true);
            }
            case BUCKET_UNAWARE: {
                return this.buildUnawareBucketSink((DataStream<InternalRow>)input);
            }
        }
        throw new UnsupportedOperationException("Unsupported bucket mode: " + (Object)((Object)bucketMode));
    }

    protected DataStream<InternalRow> mapToInternalRow(DataStream<RowData> input, RowType rowType) {
        return input.map(FlinkRowWrapper::new).setParallelism(input.getParallelism()).returns(InternalTypeInfo.fromRowType(rowType));
    }

    protected DataStreamSink<?> buildDynamicBucketSink(DataStream<InternalRow> input, boolean globalIndex) {
        Preconditions.checkArgument(this.logSinkFunction == null, "Dynamic bucket mode can not work with log system.");
        return this.compactSink && !globalIndex ? new DynamicBucketCompactSink(this.table, this.overwritePartition).build(input, this.parallelism) : (globalIndex ? new GlobalDynamicBucketSink(this.table, this.overwritePartition).build(input, this.parallelism) : new RowDynamicBucketSink(this.table, this.overwritePartition).build(input, this.parallelism));
    }

    protected DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
        DataStream<InternalRow> partitioned = FlinkStreamPartitioner.partition(input, new RowDataChannelComputer(this.table.schema(), this.logSinkFunction != null), this.parallelism);
        FixedBucketSink sink = new FixedBucketSink(this.table, this.overwritePartition, this.logSinkFunction);
        return sink.sinkFrom(partitioned);
    }

    private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> input) {
        Preconditions.checkArgument(this.table.primaryKeys().isEmpty(), "Unaware bucket mode only works with append-only table for now.");
        return new RowUnawareBucketSink(this.table, this.overwritePartition, this.logSinkFunction, this.parallelism).sinkFrom(input);
    }

    private DataStream<RowData> trySortInput(DataStream<RowData> input) {
        if (this.tableSortInfo != null) {
            TableSorter sorter = TableSorter.getSorter(input.getExecutionEnvironment(), input, this.table, this.tableSortInfo);
            return sorter.sort();
        }
        return input;
    }
}

