/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.action.TableActionBase;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactAction
extends TableActionBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(CompactAction.class);
    private List<Map<String, String>> partitions;
    private String whereSql;
    @Nullable
    private Duration partitionIdleTime = null;

    public CompactAction(String warehouse, String database, String tableName) {
        this(warehouse, database, tableName, Collections.emptyMap(), Collections.emptyMap());
    }

    public CompactAction(String warehouse, String database, String tableName, Map<String, String> catalogConfig, Map<String, String> tableConf) {
        super(warehouse, database, tableName, catalogConfig);
        if (!(this.table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException(String.format("Only FileStoreTable supports compact action. The table type is '%s'.", this.table.getClass().getName()));
        }
        HashMap<String, String> dynamicOptions = new HashMap<String, String>(tableConf);
        dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
        this.table = this.table.copy(dynamicOptions);
    }

    public CompactAction withPartitions(List<Map<String, String>> partitions) {
        this.partitions = partitions;
        return this;
    }

    public CompactAction withWhereSql(String whereSql) {
        this.whereSql = whereSql;
        return this;
    }

    public CompactAction withPartitionIdleTime(@Nullable Duration partitionIdleTime) {
        this.partitionIdleTime = partitionIdleTime;
        return this;
    }

    @Override
    public void build() throws Exception {
        ReadableConfig conf = this.env.getConfiguration();
        boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        FileStoreTable fileStoreTable = (FileStoreTable)this.table;
        switch (fileStoreTable.bucketMode()) {
            case BUCKET_UNAWARE: {
                this.buildForUnawareBucketCompaction(this.env, fileStoreTable, isStreaming);
                break;
            }
            default: {
                this.buildForTraditionalCompaction(this.env, fileStoreTable, isStreaming);
            }
        }
    }

    private void buildForTraditionalCompaction(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception {
        if (isStreaming) {
            HashMap<String, String> dynamicOptions = new HashMap<String, String>(){
                {
                    this.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
                    this.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
                    this.put(CoreOptions.LOOKUP_WAIT.key(), "false");
                }
            };
            table = table.copy(dynamicOptions);
        }
        CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(this.identifier.getFullName(), (FileStoreTable)table);
        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder((FileStoreTable)table);
        sourceBuilder.withPartitionPredicate(this.getPredicate());
        DataStreamSource<RowData> source = sourceBuilder.withEnv(env).withContinuousMode(isStreaming).withPartitionIdleTime(this.partitionIdleTime).build();
        sinkBuilder.withInput((DataStream<RowData>)source).build();
    }

    private void buildForUnawareBucketCompaction(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception {
        UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder = new UnawareBucketCompactionTopoBuilder(env, this.identifier.getFullName(), table);
        unawareBucketCompactionTopoBuilder.withPartitionPredicate(this.getPredicate());
        unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
        unawareBucketCompactionTopoBuilder.withPartitionIdleTime(this.partitionIdleTime);
        unawareBucketCompactionTopoBuilder.build();
    }

    protected Predicate getPredicate() throws Exception {
        Preconditions.checkArgument(this.partitions == null || this.whereSql == null, "partitions and where cannot be used together.");
        Predicate predicate = null;
        if (this.partitions != null) {
            predicate = PredicateBuilder.or((Predicate[])this.partitions.stream().map(p -> PartitionPredicate.createPartitionPredicate(p, this.table.rowType(), ((FileStoreTable)this.table).coreOptions().partitionDefaultName())).toArray(Predicate[]::new));
        } else if (this.whereSql != null) {
            SimpleSqlPredicateConvertor simpleSqlPredicateConvertor = new SimpleSqlPredicateConvertor(this.table.rowType());
            predicate = simpleSqlPredicateConvertor.convertSqlToPredicate(this.whereSql);
        }
        if (predicate != null) {
            LOGGER.info("the partition predicate of compaction is {}", predicate);
            PartitionPredicateVisitor partitionPredicateVisitor = new PartitionPredicateVisitor(this.table.partitionKeys());
            Preconditions.checkArgument(predicate.visit(partitionPredicateVisitor), "Only parition key can be specialized in compaction action.");
        }
        return predicate;
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute("Compact job");
    }
}

