/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.clone.CloneFileInfo;
import org.apache.paimon.flink.clone.CloneSourceBuilder;
import org.apache.paimon.flink.clone.CopyDataFileOperator;
import org.apache.paimon.flink.clone.CopyManifestFileOperator;
import org.apache.paimon.flink.clone.CopyMetaFilesForCloneOperator;
import org.apache.paimon.flink.clone.SnapshotHintChannelComputer;
import org.apache.paimon.flink.clone.SnapshotHintOperator;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.utils.StringUtils;

public class CloneAction
extends ActionBase {
    private final int parallelism;
    private Map<String, String> sourceCatalogConfig;
    private final String database;
    private final String tableName;
    private Map<String, String> targetCatalogConfig;
    private final String targetDatabase;
    private final String targetTableName;

    public CloneAction(String database, String tableName, Map<String, String> sourceCatalogConfig, String targetDatabase, String targetTableName, Map<String, String> targetCatalogConfig, String parallelismStr) {
        super(sourceCatalogConfig);
        this.parallelism = StringUtils.isNullOrWhitespaceOnly(parallelismStr) ? this.env.getParallelism() : Integer.parseInt(parallelismStr);
        this.sourceCatalogConfig = new HashMap<String, String>();
        if (!sourceCatalogConfig.isEmpty()) {
            this.sourceCatalogConfig = sourceCatalogConfig;
        }
        this.database = database;
        this.tableName = tableName;
        this.targetCatalogConfig = new HashMap<String, String>();
        if (!targetCatalogConfig.isEmpty()) {
            this.targetCatalogConfig = targetCatalogConfig;
        }
        this.targetDatabase = targetDatabase;
        this.targetTableName = targetTableName;
    }

    @Override
    public void build() {
        try {
            this.buildCloneFlinkJob(this.env);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void buildCloneFlinkJob(StreamExecutionEnvironment env) throws Exception {
        DataStream<Tuple2<String, String>> cloneSource = new CloneSourceBuilder(env, this.sourceCatalogConfig, this.database, this.tableName, this.targetDatabase, this.targetTableName).build();
        SingleOutputStreamOperator copyMetaFiles = cloneSource.forward().process((ProcessFunction)new CopyMetaFilesForCloneOperator(this.sourceCatalogConfig, this.targetCatalogConfig)).name("Side Output").setParallelism(1);
        SideOutputDataStream indexFilesStream = copyMetaFiles.getSideOutput(CopyMetaFilesForCloneOperator.INDEX_FILES_TAG);
        SideOutputDataStream dataManifestFilesStream = copyMetaFiles.getSideOutput(CopyMetaFilesForCloneOperator.DATA_MANIFEST_FILES_TAG);
        SingleOutputStreamOperator copyIndexFiles = indexFilesStream.transform("Copy Index Files", TypeInformation.of(CloneFileInfo.class), (OneInputStreamOperator)new CopyDataFileOperator(this.sourceCatalogConfig, this.targetCatalogConfig)).setParallelism(this.parallelism);
        SingleOutputStreamOperator copyDataManifestFiles = dataManifestFilesStream.transform("Copy Data Manifest Files", TypeInformation.of(CloneFileInfo.class), (OneInputStreamOperator)new CopyManifestFileOperator(this.sourceCatalogConfig, this.targetCatalogConfig)).setParallelism(this.parallelism);
        SingleOutputStreamOperator copyDataFile = copyDataManifestFiles.transform("Copy Data Files", TypeInformation.of(CloneFileInfo.class), (OneInputStreamOperator)new CopyDataFileOperator(this.sourceCatalogConfig, this.targetCatalogConfig)).setParallelism(this.parallelism);
        DataStream combinedStream = copyDataFile.union(new DataStream[]{copyIndexFiles});
        SingleOutputStreamOperator snapshotHintOperator = FlinkStreamPartitioner.partition(combinedStream, new SnapshotHintChannelComputer(), this.parallelism).transform("Recreate Snapshot Hint", TypeInformation.of(CloneFileInfo.class), (OneInputStreamOperator)new SnapshotHintOperator(this.targetCatalogConfig)).setParallelism(this.parallelism);
        snapshotHintOperator.sinkTo(new DiscardingSink()).name("end").setParallelism(1);
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute("Clone job");
    }
}

