/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.clone.CloneFilesFunction;
import org.apache.paimon.flink.clone.CloneUtils;
import org.apache.paimon.flink.clone.CommitTableOperator;
import org.apache.paimon.flink.clone.DataFileInfo;
import org.apache.paimon.flink.clone.ListCloneFilesFunction;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.hive.HiveCatalog;

public class CloneAction
extends ActionBase {
    private final Map<String, String> sourceCatalogConfig;
    private final String sourceDatabase;
    private final String sourceTableName;
    private final Map<String, String> targetCatalogConfig;
    private final String targetDatabase;
    private final String targetTableName;
    private final int parallelism;
    @Nullable
    private final String whereSql;

    public CloneAction(String sourceDatabase, String sourceTableName, Map<String, String> sourceCatalogConfig, String targetDatabase, String targetTableName, Map<String, String> targetCatalogConfig, @Nullable Integer parallelism, @Nullable String whereSql) {
        super(sourceCatalogConfig);
        Catalog sourceCatalog = this.catalog;
        if (sourceCatalog instanceof CachingCatalog) {
            sourceCatalog = ((CachingCatalog)sourceCatalog).wrapped();
        }
        if (!(sourceCatalog instanceof HiveCatalog)) {
            throw new UnsupportedOperationException("Only support clone hive tables using HiveCatalog, but current source catalog is " + sourceCatalog.getClass().getName());
        }
        this.sourceDatabase = sourceDatabase;
        this.sourceTableName = sourceTableName;
        this.sourceCatalogConfig = sourceCatalogConfig;
        this.targetDatabase = targetDatabase;
        this.targetTableName = targetTableName;
        this.targetCatalogConfig = targetCatalogConfig;
        this.parallelism = parallelism == null ? this.env.getParallelism() : parallelism.intValue();
        this.whereSql = whereSql;
    }

    @Override
    public void build() throws Exception {
        DataStream<Tuple2<Identifier, Identifier>> source2 = CloneUtils.buildSource(this.sourceDatabase, this.sourceTableName, this.targetDatabase, this.targetTableName, this.catalog, this.env);
        DataStream<Tuple2<Identifier, Identifier>> partitionedSource = FlinkStreamPartitioner.partition(source2, new CloneUtils.TableChannelComputer(), this.parallelism);
        SingleOutputStreamOperator files = partitionedSource.process((ProcessFunction)new ListCloneFilesFunction(this.sourceCatalogConfig, this.targetCatalogConfig, this.whereSql)).name("List Files").setParallelism(this.parallelism);
        SingleOutputStreamOperator dataFile = files.rebalance().process((ProcessFunction)new CloneFilesFunction(this.sourceCatalogConfig, this.targetCatalogConfig)).name("Copy Files").setParallelism(this.parallelism);
        DataStream<DataFileInfo> partitionedDataFile = FlinkStreamPartitioner.partition(dataFile, new CloneUtils.DataFileChannelComputer(), this.parallelism);
        SingleOutputStreamOperator committed = partitionedDataFile.transform("Commit table", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (OneInputStreamOperator)new CommitTableOperator(this.targetCatalogConfig)).setParallelism(this.parallelism);
        committed.sinkTo(new DiscardingSink()).name("end").setParallelism(1);
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute("Clone Hive job");
    }
}

