/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact;

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.AppendOnlyFileStoreTable;

public class UnawareBucketCompactionTopoBuilder {
    private final transient StreamExecutionEnvironment env;
    private final String tableIdentifier;
    private final AppendOnlyFileStoreTable table;
    @Nullable
    private List<Map<String, String>> specifiedPartitions = null;
    private boolean isContinuous = false;

    public UnawareBucketCompactionTopoBuilder(StreamExecutionEnvironment env, String tableIdentifier, AppendOnlyFileStoreTable table) {
        this.env = env;
        this.tableIdentifier = tableIdentifier;
        this.table = table;
    }

    public void withContinuousMode(boolean isContinuous) {
        this.isContinuous = isContinuous;
    }

    public void withPartitions(List<Map<String, String>> partitions) {
        this.specifiedPartitions = partitions;
    }

    public void build() {
        DataStreamSource<AppendOnlyCompactionTask> source = this.buildSource();
        this.sinkFromSource(source);
    }

    public DataStream<Committable> fetchUncommitted(String commitUser) {
        DataStreamSource<AppendOnlyCompactionTask> source = this.buildSource();
        DataStream<AppendOnlyCompactionTask> rebalanced = this.rebalanceInput(source);
        return new UnawareBucketCompactionSink(this.table).doWrite(rebalanced, commitUser, rebalanced.getParallelism());
    }

    private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
        long scanInterval = this.table.coreOptions().continuousDiscoveryInterval().toMillis();
        BucketUnawareCompactSource source = new BucketUnawareCompactSource(this.table, this.isContinuous, scanInterval, this.getPartitionFilter());
        return BucketUnawareCompactSource.buildSource(this.env, source, this.isContinuous, this.tableIdentifier);
    }

    private void sinkFromSource(DataStreamSource<AppendOnlyCompactionTask> input) {
        DataStream<AppendOnlyCompactionTask> rebalanced = this.rebalanceInput(input);
        UnawareBucketCompactionSink.sink(this.table, rebalanced);
    }

    private DataStream<AppendOnlyCompactionTask> rebalanceInput(DataStreamSource<AppendOnlyCompactionTask> input) {
        Options conf = Options.fromMap(this.table.options());
        Integer compactionWorkerParallelism = conf.get(FlinkConnectorOptions.UNAWARE_BUCKET_COMPACTION_PARALLELISM);
        PartitionTransformation transformation = new PartitionTransformation(input.getTransformation(), (StreamPartitioner)new RebalancePartitioner());
        if (compactionWorkerParallelism != null) {
            transformation.setParallelism(compactionWorkerParallelism.intValue());
        } else {
            transformation.setParallelism(this.env.getParallelism());
        }
        return new DataStream(this.env, (Transformation)transformation);
    }

    private Predicate getPartitionFilter() {
        Predicate partitionPredicate = null;
        if (this.specifiedPartitions != null) {
            partitionPredicate = PredicateBuilder.or((Predicate[])this.specifiedPartitions.stream().map(p -> PredicateBuilder.partition(p, this.table.rowType())).toArray(Predicate[]::new));
        }
        return partitionPredicate;
    }
}

