/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SortCompactAction
extends CompactAction {
    private static final Logger LOG = LoggerFactory.getLogger(SortCompactAction.class);
    private String sortStrategy;
    private List<String> orderColumns;

    public SortCompactAction(String database, String tableName, Map<String, String> catalogConfig, Map<String, String> tableConf) {
        super(database, tableName, catalogConfig, tableConf);
        this.table = this.table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute("Sort Compact Job");
    }

    @Override
    public void build() throws Exception {
        FileStoreTable fileStoreTable;
        if (this.env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) != RuntimeExecutionMode.BATCH) {
            LOG.warn("Sort Compact only support batch mode yet. Please add -Dexecution.runtime-mode=BATCH. The action this time will shift to batch mode forcely.");
            this.env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        }
        if ((fileStoreTable = (FileStoreTable)this.table).bucketMode() != BucketMode.BUCKET_UNAWARE && fileStoreTable.bucketMode() != BucketMode.HASH_DYNAMIC) {
            throw new IllegalArgumentException("Sort Compact only supports bucket=-1 yet.");
        }
        Map<String, String> tableConfig = fileStoreTable.options();
        FlinkSourceBuilder sourceBuilder = new FlinkSourceBuilder(fileStoreTable).sourceName(ObjectIdentifier.of((String)this.catalogName, (String)this.identifier.getDatabaseName(), (String)this.identifier.getObjectName()).asSummaryString());
        sourceBuilder.partitionPredicate(this.getPartitionPredicate());
        String scanParallelism = tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
        if (scanParallelism != null) {
            sourceBuilder.sourceParallelism(Integer.parseInt(scanParallelism));
        }
        DataStream<RowData> source = sourceBuilder.env(this.env).sourceBounded(true).build();
        int localSampleMagnification = ((FileStoreTable)this.table).coreOptions().getLocalSampleMagnification();
        if (localSampleMagnification < 20) {
            throw new IllegalArgumentException(String.format("the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.", CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(), localSampleMagnification));
        }
        String sinkParallelismValue = this.table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        int sinkParallelism = sinkParallelismValue == null ? source.getParallelism() : Integer.parseInt(sinkParallelismValue);
        TableSortInfo sortInfo = new TableSortInfo.Builder().setSortColumns(this.orderColumns).setSortStrategy(CoreOptions.OrderType.of(this.sortStrategy)).setSinkParallelism(sinkParallelism).setLocalSampleSize(sinkParallelism * localSampleMagnification).setGlobalSampleSize(sinkParallelism * 1000).setRangeNumber(sinkParallelism * 10).build();
        TableSorter sorter = TableSorter.getSorter(this.env, source, fileStoreTable, sortInfo);
        new SortCompactSinkBuilder(fileStoreTable).forCompact(true).forRowData(sorter.sort()).overwrite().build();
    }

    public SortCompactAction withOrderStrategy(String sortStrategy) {
        this.sortStrategy = sortStrategy;
        return this;
    }

    public SortCompactAction withOrderColumns(String ... orderColumns) {
        return this.withOrderColumns(Arrays.asList(orderColumns));
    }

    public SortCompactAction withOrderColumns(List<String> orderColumns) {
        this.orderColumns = orderColumns.stream().map(String::trim).collect(Collectors.toList());
        return this;
    }
}

