/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.action.TableActionBase;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.table.FileStoreTable;

public class CompactAction
extends TableActionBase {
    private List<Map<String, String>> partitions;

    public CompactAction(String warehouse, String database, String tableName) {
        this(warehouse, database, tableName, Collections.emptyMap(), Collections.emptyMap());
    }

    public CompactAction(String warehouse, String database, String tableName, Map<String, String> catalogConfig, Map<String, String> tableConf) {
        super(warehouse, database, tableName, catalogConfig);
        if (!(this.table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException(String.format("Only FileStoreTable supports compact action. The table type is '%s'.", this.table.getClass().getName()));
        }
        HashMap<String, String> dynamicOptions = new HashMap<String, String>(tableConf);
        dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
        this.table = this.table.copy(dynamicOptions);
    }

    public CompactAction withPartitions(List<Map<String, String>> partitions) {
        this.partitions = partitions;
        return this;
    }

    @Override
    public void build() {
        ReadableConfig conf = this.env.getConfiguration();
        boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        FileStoreTable fileStoreTable = (FileStoreTable)this.table;
        switch (fileStoreTable.bucketMode()) {
            case UNAWARE: {
                this.buildForUnawareBucketCompaction(this.env, fileStoreTable, isStreaming);
                break;
            }
            default: {
                this.buildForTraditionalCompaction(this.env, fileStoreTable, isStreaming);
            }
        }
    }

    private void buildForTraditionalCompaction(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) {
        CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(this.identifier.getFullName(), table);
        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
        sourceBuilder.withPartitions(this.partitions);
        DataStreamSource<RowData> source = sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
        sinkBuilder.withInput((DataStream<RowData>)source).build();
    }

    private void buildForUnawareBucketCompaction(StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) {
        UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder = new UnawareBucketCompactionTopoBuilder(env, this.identifier.getFullName(), table);
        unawareBucketCompactionTopoBuilder.withPartitions(this.partitions);
        unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
        unawareBucketCompactionTopoBuilder.build();
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute("Compact job");
    }

    public List<Map<String, String>> getPartitions() {
        return this.partitions;
    }
}

