/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

public class SortCompactAction
extends CompactAction {
    private String sortStrategy;
    private List<String> orderColumns;

    public SortCompactAction(String warehouse, String database, String tableName, Map<String, String> catalogConfig) {
        super(warehouse, database, tableName, catalogConfig);
        Preconditions.checkArgument(this.table instanceof AppendOnlyFileStoreTable, "Only sort compaction works with append-only table for now.");
        this.table = this.table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
    }

    @Override
    public void run() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        this.build(env);
        this.execute(env, "Sort Compact Job");
    }

    @Override
    public void build(StreamExecutionEnvironment env) {
        String scanParallelism;
        if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) != RuntimeExecutionMode.BATCH) {
            throw new IllegalArgumentException("Only support batch mode yet, please set -Dexecution.runtime-mode=BATCH");
        }
        FileStoreTable fileStoreTable = (FileStoreTable)this.table;
        if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) {
            throw new IllegalArgumentException("Sort Compact only supports append-only table yet");
        }
        if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
            throw new IllegalArgumentException("Sort Compact only supports bucket=-1 yet.");
        }
        Map<String, String> tableConfig = fileStoreTable.options();
        FlinkSourceBuilder sourceBuilder = new FlinkSourceBuilder(ObjectIdentifier.of((String)this.catalogName, (String)this.identifier.getDatabaseName(), (String)this.identifier.getObjectName()), fileStoreTable);
        if (this.getPartitions() != null) {
            Predicate partitionPredicate = PredicateBuilder.or((Predicate[])this.getPartitions().stream().map(p -> PredicateBuilder.partition(p, this.table.rowType())).toArray(Predicate[]::new));
            sourceBuilder.withPredicate(partitionPredicate);
        }
        if ((scanParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key())) != null) {
            sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
        }
        DataStream<RowData> source = sourceBuilder.withEnv(env).withContinuousMode(false).build();
        TableSorter sorter = TableSorter.getSorter(env, source, fileStoreTable, this.sortStrategy, this.orderColumns);
        DataStream<RowData> sorted = sorter.sort();
        FlinkSinkBuilder flinkSinkBuilder = new FlinkSinkBuilder(fileStoreTable);
        flinkSinkBuilder.withInput(sorted).withOverwritePartition(new HashMap<String, String>());
        String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (sinkParallelism != null) {
            flinkSinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
        }
        flinkSinkBuilder.build();
    }

    public void withOrderStrategy(String sortStrategy) {
        this.sortStrategy = sortStrategy;
    }

    public void withOrderColumns(List<String> orderColumns) {
        this.orderColumns = orderColumns;
    }
}

