/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.action.TableActionBase;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;

public class RescaleAction
extends TableActionBase {
    @Nullable
    private Integer bucketNum;
    private Map<String, String> partition = new HashMap<String, String>();
    @Nullable
    private Integer scanParallelism;
    @Nullable
    private Integer sinkParallelism;

    public RescaleAction(String databaseName, String tableName, Map<String, String> catalogConfig) {
        super(databaseName, tableName, catalogConfig);
    }

    public RescaleAction withBucketNum(int bucketNum) {
        this.bucketNum = bucketNum;
        return this;
    }

    public RescaleAction withPartition(Map<String, String> partition) {
        this.partition = partition;
        return this;
    }

    public RescaleAction withScanParallelism(int scanParallelism) {
        this.scanParallelism = scanParallelism;
        return this;
    }

    public RescaleAction withSinkParallelism(int sinkParallelism) {
        this.sinkParallelism = sinkParallelism;
        return this;
    }

    @Override
    public void build() throws Exception {
        Configuration flinkConf = new Configuration();
        flinkConf.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        this.env.configure((ReadableConfig)flinkConf);
        FileStoreTable fileStoreTable = (FileStoreTable)this.table;
        RowType partitionType = fileStoreTable.schema().logicalPartitionType();
        Predicate partitionPredicate = PartitionPredicate.createPartitionPredicate(partitionType, InternalRowPartitionComputer.convertSpecToInternal(this.partition, partitionType, fileStoreTable.coreOptions().partitionDefaultName()));
        DataStream<RowData> source2 = new FlinkSourceBuilder(fileStoreTable).env(this.env).sourceBounded(true).sourceParallelism(this.scanParallelism == null ? this.currentBucketNum() : this.scanParallelism.intValue()).predicate(partitionPredicate).build();
        HashMap<String, String> bucketOptions = new HashMap<String, String>(fileStoreTable.options());
        if (this.bucketNum == null) {
            Preconditions.checkArgument(fileStoreTable.coreOptions().bucket() != -2, "When rescaling postpone bucket tables, you must provide the resulting bucket number.");
        } else {
            bucketOptions.put(CoreOptions.BUCKET.key(), String.valueOf(this.bucketNum));
        }
        FileStoreTable rescaledTable = fileStoreTable.copy(fileStoreTable.schema().copy(bucketOptions));
        new FlinkSinkBuilder(rescaledTable).overwrite(this.partition).parallelism(this.sinkParallelism == null ? this.bucketNum : this.sinkParallelism).forRowData(source2).build();
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.env.execute("Rescale Postpone Bucket : " + this.table.fullName());
    }

    private int currentBucketNum() {
        FileStoreTable fileStoreTable = (FileStoreTable)this.table;
        Iterator<ManifestEntry> it = fileStoreTable.newSnapshotReader().withPartitionFilter(this.partition).readFileIterator();
        Preconditions.checkArgument(it.hasNext(), "The specified partition does not have any data files. No need to rescale.");
        return it.next().totalBuckets();
    }
}

