/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SortCompactAction
extends CompactAction {
    private static final Logger LOG = LoggerFactory.getLogger(SortCompactAction.class);
    private String sortStrategy;
    private List<String> orderColumns;

    public SortCompactAction(String warehouse, String database, String tableName, Map<String, String> catalogConfig, Map<String, String> tableConf) {
        super(warehouse, database, tableName, catalogConfig, tableConf);
        this.table = this.table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute("Sort Compact Job");
    }

    @Override
    public void build() {
        String scanParallelism;
        FileStoreTable fileStoreTable;
        if (this.env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) != RuntimeExecutionMode.BATCH) {
            LOG.warn("Sort Compact only support batch mode yet. Please add -Dexecution.runtime-mode=BATCH. The action this time will shift to batch mode forcely.");
            this.env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        }
        if ((fileStoreTable = (FileStoreTable)this.table).bucketMode() != BucketMode.UNAWARE && fileStoreTable.bucketMode() != BucketMode.DYNAMIC) {
            throw new IllegalArgumentException("Sort Compact only supports bucket=-1 yet.");
        }
        Map<String, String> tableConfig = fileStoreTable.options();
        FlinkSourceBuilder sourceBuilder = new FlinkSourceBuilder(ObjectIdentifier.of((String)this.catalogName, (String)this.identifier.getDatabaseName(), (String)this.identifier.getObjectName()), fileStoreTable);
        if (this.getPartitions() != null) {
            Predicate partitionPredicate = PredicateBuilder.or((Predicate[])this.getPartitions().stream().map(p -> PredicateBuilder.partition(p, this.table.rowType())).toArray(Predicate[]::new));
            sourceBuilder.withPredicate(partitionPredicate);
        }
        if ((scanParallelism = tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key())) != null) {
            sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
        }
        DataStream<RowData> source = sourceBuilder.withEnv(this.env).withContinuousMode(false).build();
        TableSorter sorter = TableSorter.getSorter(this.env, source, fileStoreTable, this.sortStrategy, this.orderColumns);
        new FlinkSinkBuilder(fileStoreTable).withInput(sorter.sort()).forCompact(true).withOverwritePartition(new HashMap<String, String>()).build();
    }

    public SortCompactAction withOrderStrategy(String sortStrategy) {
        this.sortStrategy = sortStrategy;
        return this;
    }

    public SortCompactAction withOrderColumns(String ... orderColumns) {
        return this.withOrderColumns(Arrays.asList(orderColumns));
    }

    public SortCompactAction withOrderColumns(List<String> orderColumns) {
        this.orderColumns = orderColumns.stream().map(String::trim).collect(Collectors.toList());
        return this;
    }
}

