/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.TableActionBase;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
import org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactAction
extends TableActionBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(CompactAction.class);
    private List<Map<String, String>> partitions;
    private String whereSql;
    @Nullable
    private Duration partitionIdleTime = null;
    private Boolean fullCompaction;

    public CompactAction(String database, String tableName, Map<String, String> catalogConfig, Map<String, String> tableConf) {
        super(database, tableName, catalogConfig);
        if (!(this.table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException(String.format("Only FileStoreTable supports compact action. The table type is '%s'.", this.table.getClass().getName()));
        }
        HashMap<String, String> dynamicOptions = new HashMap<String, String>(tableConf);
        dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
        this.table = this.table.copy(dynamicOptions);
    }

    public CompactAction withPartitions(List<Map<String, String>> partitions) {
        this.partitions = partitions;
        return this;
    }

    public CompactAction withWhereSql(String whereSql) {
        this.whereSql = whereSql;
        return this;
    }

    public CompactAction withPartitionIdleTime(@Nullable Duration partitionIdleTime) {
        this.partitionIdleTime = partitionIdleTime;
        return this;
    }

    public CompactAction withFullCompaction(Boolean fullCompaction) {
        this.fullCompaction = fullCompaction;
        return this;
    }

    @Override
    public void build() throws Exception {
        this.buildImpl();
    }

    private boolean buildImpl() throws Exception {
        ReadableConfig conf = this.env.getConfiguration();
        boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        FileStoreTable fileStoreTable = (FileStoreTable)this.table;
        if (fileStoreTable.coreOptions().bucket() == -2) {
            return this.buildForPostponeBucketCompaction(this.env, fileStoreTable, isStreaming);
        }
        if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
            this.buildForUnawareBucketCompaction(this.env, fileStoreTable, isStreaming);
            return true;
        }
        this.buildForTraditionalCompaction(this.env, fileStoreTable, isStreaming);
        return true;
    }

    private void buildForTraditionalCompaction(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception {
        if (this.fullCompaction == null) {
            this.fullCompaction = !isStreaming;
        } else {
            Preconditions.checkArgument(this.fullCompaction == false || !isStreaming, "The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
        }
        if (isStreaming) {
            HashMap<String, String> dynamicOptions = new HashMap<String, String>(){
                {
                    this.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
                    this.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
                    this.put(CoreOptions.LOOKUP_WAIT.key(), "false");
                }
            };
            table = table.copy(dynamicOptions);
        }
        CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(this.identifier.getFullName(), (FileStoreTable)table);
        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder((FileStoreTable)table, this.fullCompaction);
        sourceBuilder.withPartitionPredicate(this.getPredicate());
        DataStreamSource<RowData> source2 = sourceBuilder.withEnv(env).withContinuousMode(isStreaming).withPartitionIdleTime(this.partitionIdleTime).build();
        sinkBuilder.withInput((DataStream<RowData>)source2).build();
    }

    private void buildForUnawareBucketCompaction(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception {
        UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder = new UnawareBucketCompactionTopoBuilder(env, this.identifier.getFullName(), table);
        unawareBucketCompactionTopoBuilder.withPartitionPredicate(this.getPredicate());
        unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
        unawareBucketCompactionTopoBuilder.withPartitionIdleTime(this.partitionIdleTime);
        unawareBucketCompactionTopoBuilder.build();
    }

    protected Predicate getPredicate() throws Exception {
        Preconditions.checkArgument(this.partitions == null || this.whereSql == null, "partitions and where cannot be used together.");
        Predicate predicate = null;
        if (this.partitions != null) {
            predicate = PredicateBuilder.or((Predicate[])this.partitions.stream().map(p -> PartitionPredicate.createPartitionPredicate(p, this.table.rowType(), ((FileStoreTable)this.table).coreOptions().partitionDefaultName())).toArray(Predicate[]::new));
        } else if (this.whereSql != null) {
            SimpleSqlPredicateConvertor simpleSqlPredicateConvertor = new SimpleSqlPredicateConvertor(this.table.rowType());
            predicate = simpleSqlPredicateConvertor.convertSqlToPredicate(this.whereSql);
        }
        if (predicate != null) {
            LOGGER.info("the partition predicate of compaction is {}", predicate);
            PartitionPredicateVisitor partitionPredicateVisitor = new PartitionPredicateVisitor(this.table.partitionKeys());
            Preconditions.checkArgument(predicate.visit(partitionPredicateVisitor), "Only partition key can be specialized in compaction action.");
        }
        return predicate;
    }

    private boolean buildForPostponeBucketCompaction(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) {
        Preconditions.checkArgument(!isStreaming, "Postpone bucket compaction currently only supports batch mode");
        Preconditions.checkArgument(this.partitions == null, "Postpone bucket compaction currently does not support specifying partitions");
        Preconditions.checkArgument(this.whereSql == null, "Postpone bucket compaction currently does not support predicates");
        Options options = new Options(table.options());
        int defaultBucketNum = options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
        HashMap<String, String> bucketOptions = new HashMap<String, String>(table.options());
        bucketOptions.put(CoreOptions.BUCKET.key(), String.valueOf(defaultBucketNum));
        FileStoreTable fileStoreTable = table.copy(table.schema().copy(bucketOptions));
        List<BinaryRow> partitions = fileStoreTable.newSnapshotReader().withBucket(-2).partitions();
        if (partitions.isEmpty()) {
            return false;
        }
        InternalRowPartitionComputer partitionComputer = new InternalRowPartitionComputer(fileStoreTable.coreOptions().partitionDefaultName(), fileStoreTable.store().partitionType(), fileStoreTable.partitionKeys().toArray(new String[0]), fileStoreTable.coreOptions().legacyPartitionName());
        String commitUser = CoreOptions.createCommitUser(options);
        ArrayList<Object> dataStreams = new ArrayList<Object>();
        for (BinaryRow partition : partitions) {
            int bucketNum = defaultBucketNum;
            Iterator<ManifestEntry> it = table.newSnapshotReader().withPartitionFilter(Collections.singletonList(partition)).readFileIterator();
            if (it.hasNext()) {
                bucketNum = it.next().totalBuckets();
            }
            bucketOptions = new HashMap<String, String>(table.options());
            bucketOptions.put(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
            FileStoreTable realTable = table.copy(table.schema().copy(bucketOptions));
            LinkedHashMap<String, String> partitionSpec = partitionComputer.generatePartValues(partition);
            Pair<DataStream<RowData>, DataStream<Committable>> sourcePair = PostponeBucketCompactSplitSource.buildSource(env, realTable.fullName() + partitionSpec, realTable.rowType(), realTable.newReadBuilder().withPartitionFilter(partitionSpec).withBucket(-2), options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
            DataStream<InternalRow> partitioned = FlinkStreamPartitioner.partition(FlinkSinkBuilder.mapToInternalRow(sourcePair.getLeft(), realTable.rowType()), new RowDataChannelComputer(realTable.schema(), false), null);
            FixedBucketSink sink2 = new FixedBucketSink(realTable, null, null);
            SingleOutputStreamOperator written = sink2.doWrite(partitioned, commitUser, partitioned.getParallelism()).forward().transform("Rewrite compact committable", (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperator)new RewritePostponeBucketCommittableOperator(realTable));
            dataStreams.add(written);
            dataStreams.add(sourcePair.getRight());
        }
        FixedBucketSink sink3 = new FixedBucketSink(fileStoreTable, null, null);
        DataStream dataStream = (DataStream)dataStreams.get(0);
        for (int i = 1; i < dataStreams.size(); ++i) {
            dataStream = dataStream.union(new DataStream[]{(DataStream)dataStreams.get(i)});
        }
        sink3.doCommit((DataStream<Committable>)dataStream, commitUser);
        return true;
    }

    @Override
    public void run() throws Exception {
        if (this.buildImpl()) {
            this.execute("Compact job : " + this.table.fullName());
        }
    }
}

