/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.cluster.IncrementalClusterManager;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.TableActionBase;
import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
import org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.RowAppendTableSink;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactAction
extends TableActionBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(CompactAction.class);
    private List<Map<String, String>> partitions;
    private String whereSql;
    @Nullable
    private Duration partitionIdleTime = null;
    private Boolean fullCompaction;

    public CompactAction(String database, String tableName, Map<String, String> catalogConfig, Map<String, String> tableConf) {
        super(database, tableName, catalogConfig);
        if (!(this.table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException(String.format("Only FileStoreTable supports compact action. The table type is '%s'.", this.table.getClass().getName()));
        }
        Preconditions.checkArgument(!((FileStoreTable)this.table).coreOptions().dataEvolutionEnabled(), "Compact action does not support data evolution table yet. ");
        HashMap<String, String> dynamicOptions = new HashMap<String, String>(tableConf);
        dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
        this.table = this.table.copy(dynamicOptions);
    }

    public CompactAction withPartitions(List<Map<String, String>> partitions) {
        this.partitions = partitions;
        return this;
    }

    public CompactAction withWhereSql(String whereSql) {
        this.whereSql = whereSql;
        return this;
    }

    public CompactAction withPartitionIdleTime(@Nullable Duration partitionIdleTime) {
        this.partitionIdleTime = partitionIdleTime;
        return this;
    }

    public CompactAction withFullCompaction(Boolean fullCompaction) {
        this.fullCompaction = fullCompaction;
        return this;
    }

    @Override
    public void build() throws Exception {
        this.buildImpl();
    }

    private boolean buildImpl() throws Exception {
        ReadableConfig conf = this.env.getConfiguration();
        boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        FileStoreTable fileStoreTable = (FileStoreTable)this.table;
        if (fileStoreTable.coreOptions().bucket() == -2) {
            return this.buildForPostponeBucketCompaction(this.env, fileStoreTable, isStreaming);
        }
        if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
            if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
                return this.buildForIncrementalClustering(this.env, fileStoreTable, isStreaming);
            }
            this.buildForAppendTableCompact(this.env, fileStoreTable, isStreaming);
            return true;
        }
        this.buildForBucketedTableCompact(this.env, fileStoreTable, isStreaming);
        return true;
    }

    private void buildForBucketedTableCompact(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception {
        if (this.fullCompaction == null) {
            this.fullCompaction = !isStreaming;
        } else {
            Preconditions.checkArgument(this.fullCompaction == false || !isStreaming, "The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
        }
        if (isStreaming) {
            HashMap<String, String> dynamicOptions = new HashMap<String, String>(){
                {
                    this.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
                    this.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
                    this.put(CoreOptions.LOOKUP_WAIT.key(), "false");
                }
            };
            table = table.copy(dynamicOptions);
        }
        CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(this.identifier.getFullName(), (FileStoreTable)table);
        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder((FileStoreTable)table, this.fullCompaction);
        sourceBuilder.withPartitionPredicate(this.getPartitionPredicate());
        DataStreamSource<RowData> source = sourceBuilder.withEnv(env).withContinuousMode(isStreaming).withPartitionIdleTime(this.partitionIdleTime).build();
        sinkBuilder.withInput((DataStream<RowData>)source).build();
    }

    private void buildForAppendTableCompact(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception {
        AppendTableCompactBuilder builder = new AppendTableCompactBuilder(env, this.identifier.getFullName(), table);
        builder.withPartitionPredicate(this.getPartitionPredicate());
        builder.withContinuousMode(isStreaming);
        builder.withPartitionIdleTime(this.partitionIdleTime);
        builder.build();
    }

    private boolean buildForIncrementalClustering(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception {
        Preconditions.checkArgument(!isStreaming, "Incremental clustering currently only supports batch mode");
        IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table, this.getPartitionPredicate());
        if (this.fullCompaction == null) {
            this.fullCompaction = false;
        }
        Options options = new Options(table.options());
        int localSampleMagnification = table.coreOptions().getLocalSampleMagnification();
        if (localSampleMagnification < 20) {
            throw new IllegalArgumentException(String.format("the config '%s=%d' should not be set too small, greater than or equal to 20 is needed.", CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(), localSampleMagnification));
        }
        String commitUser = CoreOptions.createCommitUser(options);
        InternalRowPartitionComputer partitionComputer = new InternalRowPartitionComputer(table.coreOptions().partitionDefaultName(), table.store().partitionType(), table.partitionKeys().toArray(new String[0]), table.coreOptions().legacyPartitionName());
        Map<BinaryRow, CompactUnit> compactUnits = incrementalClusterManager.prepareForCluster(this.fullCompaction);
        if (compactUnits.isEmpty()) {
            LOGGER.warn("No partition needs to be incrementally clustered. Please set '--compact_strategy full' if you need forcibly trigger the cluster.Please set '--force_start_flink_job true' if you need forcibly start a flink job.");
            return false;
        }
        Map<BinaryRow, DataSplit[]> partitionSplits = compactUnits.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> incrementalClusterManager.toSplits((BinaryRow)entry.getKey(), ((CompactUnit)entry.getValue()).files()).toArray(new DataSplit[0])));
        ArrayList<Object> dataStreams = new ArrayList<Object>();
        for (Map.Entry<BinaryRow, DataSplit[]> entry2 : partitionSplits.entrySet()) {
            DataSplit[] splits = entry2.getValue();
            LinkedHashMap<String, String> partitionSpec = partitionComputer.generatePartValues(entry2.getKey());
            Pair<DataStream<RowData>, DataStream<Committable>> sourcePair = IncrementalClusterSplitSource.buildSource(env, table, partitionSpec, splits, options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
            Integer sinkParallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
            if (sinkParallelism == null) {
                sinkParallelism = sourcePair.getLeft().getParallelism();
            }
            TableSortInfo sortInfo = new TableSortInfo.Builder().setSortColumns(incrementalClusterManager.clusterKeys()).setSortStrategy(incrementalClusterManager.clusterCurve()).setSinkParallelism(sinkParallelism).setLocalSampleSize(sinkParallelism * localSampleMagnification).setGlobalSampleSize(sinkParallelism * 1000).setRangeNumber(sinkParallelism * 10).build();
            DataStream<RowData> sorted = TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort();
            RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
            DataStream<Committable> written = sink.doWrite(FlinkSinkBuilder.mapToInternalRow(sorted, table.rowType()), commitUser, null);
            SingleOutputStreamOperator clusterCommittable = written.forward().transform("Rewrite cluster committable", (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperator)new RewriteIncrementalClusterCommittableOperator(table, compactUnits.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, unit -> ((CompactUnit)unit.getValue()).outputLevel())))).setParallelism(written.getParallelism());
            dataStreams.add(clusterCommittable);
            dataStreams.add(sourcePair.getRight());
        }
        RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
        DataStream dataStream = (DataStream)dataStreams.get(0);
        for (int i = 1; i < dataStreams.size(); ++i) {
            dataStream = dataStream.union(new DataStream[]{(DataStream)dataStreams.get(i)});
        }
        sink.doCommit((DataStream<Committable>)dataStream, commitUser);
        return true;
    }

    protected PartitionPredicate getPartitionPredicate() throws Exception {
        Preconditions.checkArgument(this.partitions == null || this.whereSql == null, "partitions and where cannot be used together.");
        Predicate predicate = null;
        RowType partitionType = this.table.rowType().project(this.table.partitionKeys());
        String partitionDefaultName = ((FileStoreTable)this.table).coreOptions().partitionDefaultName();
        if (this.partitions != null) {
            boolean fullMode = this.partitions.stream().allMatch(part -> part.size() == partitionType.getFieldCount());
            if (fullMode) {
                List<BinaryRow> binaryPartitions = PartitionPredicate.createBinaryPartitions(this.partitions, partitionType, partitionDefaultName);
                return PartitionPredicate.fromMultiple(partitionType, binaryPartitions);
            }
            predicate = this.partitions.stream().map(partition -> PartitionPredicate.createPartitionPredicate(partition, this.table.rowType(), partitionDefaultName)).reduce((xva$0, xva$1) -> PredicateBuilder.or(xva$0, xva$1)).orElseThrow(() -> new RuntimeException("Failed to get partition filter."));
        } else if (this.whereSql != null) {
            SimpleSqlPredicateConvertor simpleSqlPredicateConvertor = new SimpleSqlPredicateConvertor(this.table.rowType());
            predicate = simpleSqlPredicateConvertor.convertSqlToPredicate(this.whereSql);
        }
        if (predicate != null) {
            LOGGER.info("the partition predicate of compaction is {}", predicate);
            PartitionPredicateVisitor partitionPredicateVisitor = new PartitionPredicateVisitor(this.table.partitionKeys());
            Preconditions.checkArgument(predicate.visit(partitionPredicateVisitor), "Only partition key can be specialized in compaction action.");
            predicate = predicate.visit(new PredicateProjectionConverter(this.table.rowType().projectIndexes(this.table.partitionKeys()))).orElseThrow(() -> new RuntimeException("Failed to convert partition predicate."));
        }
        return PartitionPredicate.fromPredicate(partitionType, predicate);
    }

    private boolean buildForPostponeBucketCompaction(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) {
        Preconditions.checkArgument(!isStreaming, "Postpone bucket compaction currently only supports batch mode");
        Preconditions.checkArgument(this.partitions == null, "Postpone bucket compaction currently does not support specifying partitions");
        Preconditions.checkArgument(this.whereSql == null, "Postpone bucket compaction currently does not support predicates");
        Options options = new Options(table.options());
        int defaultBucketNum = options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
        HashMap<String, String> bucketOptions = new HashMap<String, String>(table.options());
        bucketOptions.put(CoreOptions.BUCKET.key(), String.valueOf(defaultBucketNum));
        FileStoreTable fileStoreTable = table.copy(table.schema().copy(bucketOptions));
        List<BinaryRow> partitions = fileStoreTable.newSnapshotReader().withBucket(-2).partitions();
        if (partitions.isEmpty()) {
            return false;
        }
        InternalRowPartitionComputer partitionComputer = new InternalRowPartitionComputer(fileStoreTable.coreOptions().partitionDefaultName(), fileStoreTable.store().partitionType(), fileStoreTable.partitionKeys().toArray(new String[0]), fileStoreTable.coreOptions().legacyPartitionName());
        String commitUser = CoreOptions.createCommitUser(options);
        ArrayList<Object> dataStreams = new ArrayList<Object>();
        for (BinaryRow partition : partitions) {
            int bucketNum = defaultBucketNum;
            Iterator<ManifestEntry> it = table.newSnapshotReader().withPartitionFilter(Collections.singletonList(partition)).onlyReadRealBuckets().readFileIterator();
            if (it.hasNext()) {
                bucketNum = it.next().totalBuckets();
            }
            bucketOptions = new HashMap<String, String>(table.options());
            bucketOptions.put(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
            FileStoreTable realTable = table.copy(table.schema().copy(bucketOptions));
            LinkedHashMap<String, String> partitionSpec = partitionComputer.generatePartValues(partition);
            Pair<DataStream<RowData>, DataStream<Committable>> sourcePair = PostponeBucketCompactSplitSource.buildSource(env, realTable, partitionSpec, options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
            DataStream<InternalRow> partitioned = FlinkStreamPartitioner.partition(FlinkSinkBuilder.mapToInternalRow(sourcePair.getLeft(), realTable.rowType()), new RowDataChannelComputer(realTable.schema(), false), null);
            FixedBucketSink sink = new FixedBucketSink(realTable, null, null);
            SingleOutputStreamOperator written = sink.doWrite(partitioned, commitUser, partitioned.getParallelism()).forward().transform("Rewrite compact committable", (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperator)new RewritePostponeBucketCommittableOperator(realTable));
            dataStreams.add(written);
            dataStreams.add(sourcePair.getRight());
        }
        FixedBucketSink sink = new FixedBucketSink(fileStoreTable, null, null);
        DataStream dataStream = (DataStream)dataStreams.get(0);
        for (int i = 1; i < dataStreams.size(); ++i) {
            dataStream = dataStream.union(new DataStream[]{(DataStream)dataStreams.get(i)});
        }
        sink.doCommit((DataStream<Committable>)dataStream, commitUser);
        return true;
    }

    @Override
    public void run() throws Exception {
        if (this.buildImpl()) {
            this.execute("Compact job : " + this.table.fullName());
        }
    }
}

