/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.clone.ShuffleIdentifierByTableComputer;
import org.apache.paimon.flink.clone.schema.ClonePaimonSchemaFunction;
import org.apache.paimon.flink.clone.spits.CloneSplitsFunction;
import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
import org.apache.paimon.flink.clone.spits.CommitMessageTableOperator;
import org.apache.paimon.flink.clone.spits.ListCloneSplitsFunction;
import org.apache.paimon.flink.clone.spits.ShuffleCommitMessageByTableComputer;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClonePaimonTableUtils {
    private static final Logger LOG = LoggerFactory.getLogger(ClonePaimonTableUtils.class);

    public static DataStream<Tuple2<Identifier, Identifier>> buildSource(String sourceDatabase, String sourceTableName, String targetDatabase, String targetTableName, Catalog sourceCatalog, @Nullable List<String> includedTables, @Nullable List<String> excludedTables, StreamExecutionEnvironment env) throws Exception {
        ArrayList<Tuple2> result = new ArrayList<Tuple2>();
        if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(sourceTableName), "sourceTableName must be blank when database is null.");
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(targetDatabase), "targetDatabase must be blank when clone all tables in a catalog.");
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(targetTableName), "targetTableName must be blank when clone all tables in a catalog.");
            for (Identifier identifier : ClonePaimonTableUtils.listTables(sourceCatalog, includedTables, excludedTables)) {
                result.add(new Tuple2((Object)identifier, (Object)identifier));
            }
        } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
            Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(targetDatabase), "targetDatabase must not be blank when clone all tables in a database.");
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(targetTableName), "targetTableName must be blank when clone all tables in a catalog.");
            for (Identifier identifier : ClonePaimonTableUtils.listTables(sourceCatalog, sourceDatabase, includedTables, excludedTables)) {
                result.add(new Tuple2((Object)identifier, (Object)Identifier.create(targetDatabase, identifier.getObjectName())));
            }
        } else {
            Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(targetDatabase), "targetDatabase must not be blank when clone a table.");
            Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(targetTableName), "targetTableName must not be blank when clone a table.");
            Preconditions.checkArgument(CollectionUtils.isEmpty(includedTables), "includedTables must be empty when clone a single table.");
            Preconditions.checkArgument(CollectionUtils.isEmpty(excludedTables), "excludedTables must be empty when clone a single table.");
            result.add(new Tuple2((Object)Identifier.create(sourceDatabase, sourceTableName), (Object)Identifier.create(targetDatabase, targetTableName)));
        }
        Preconditions.checkState(!result.isEmpty(), "Didn't find any table in source catalog.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("The clone identifiers of source table and target table are: {}", result);
        }
        return env.fromCollection(result).forceNonParallel();
    }

    public static void build(StreamExecutionEnvironment env, Catalog sourceCatalog, String sourceDatabase, String sourceTableName, Map<String, String> sourceCatalogConfig, String targetDatabase, String targetTableName, Map<String, String> targetCatalogConfig, int parallelism, @Nullable String whereSql, @Nullable List<String> includedTables, @Nullable List<String> excludedTables) throws Exception {
        DataStream<Tuple2<Identifier, Identifier>> source = ClonePaimonTableUtils.buildSource(sourceDatabase, sourceTableName, targetDatabase, targetTableName, sourceCatalog, includedTables, excludedTables, env);
        DataStream<Tuple2<Identifier, Identifier>> partitionedSource = FlinkStreamPartitioner.partition(source, new ShuffleIdentifierByTableComputer(), parallelism);
        SingleOutputStreamOperator schemaInfos = partitionedSource.process((ProcessFunction)new ClonePaimonSchemaFunction(sourceCatalogConfig, targetCatalogConfig)).name("Clone Schema").setParallelism(parallelism);
        SingleOutputStreamOperator splits = schemaInfos.process((ProcessFunction)new ListCloneSplitsFunction(sourceCatalogConfig, targetCatalogConfig, whereSql)).name("List Splits").setParallelism(parallelism);
        SingleOutputStreamOperator commitMessage = splits.rebalance().process((ProcessFunction)new CloneSplitsFunction(sourceCatalogConfig, targetCatalogConfig)).name("Copy Splits").setParallelism(parallelism);
        DataStream<CommitMessageInfo> partitionedCommitMessage = FlinkStreamPartitioner.partition(commitMessage, new ShuffleCommitMessageByTableComputer(), parallelism);
        SingleOutputStreamOperator committed = partitionedCommitMessage.transform("Commit Table", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (OneInputStreamOperator)new CommitMessageTableOperator(targetCatalogConfig)).setParallelism(parallelism);
        committed.sinkTo((Sink)new DiscardingSink()).name("end").setParallelism(1);
    }

    public static List<Identifier> listTables(Catalog catalog, @Nullable List<String> includedTables, @Nullable List<String> excludedTables) throws Exception {
        HashSet<String> includedTableSet = new HashSet<String>();
        if (CollectionUtils.isNotEmpty(includedTables)) {
            includedTableSet.addAll(includedTables);
        }
        HashSet<String> excludedTableSet = new HashSet<String>();
        if (CollectionUtils.isNotEmpty(excludedTables)) {
            excludedTableSet.addAll(excludedTables);
        }
        ArrayList<Identifier> results = new ArrayList<Identifier>();
        for (String database : catalog.listDatabases()) {
            for (String table : catalog.listTables(database)) {
                Identifier identifier = Identifier.create(database, table);
                if (excludedTableSet.contains(identifier.getFullName()) || !CollectionUtils.isEmpty(includedTableSet) && !includedTableSet.contains(identifier.getFullName())) continue;
                results.add(identifier);
            }
        }
        return results;
    }

    public static List<Identifier> listTables(Catalog catalog, String database, @Nullable List<String> includedTables, @Nullable List<String> excludedTables) throws Exception {
        HashSet<String> includedTableSet = new HashSet<String>();
        if (CollectionUtils.isNotEmpty(includedTables)) {
            includedTableSet.addAll(includedTables);
        }
        HashSet<String> excludedTableSet = new HashSet<String>();
        if (CollectionUtils.isNotEmpty(excludedTables)) {
            excludedTableSet.addAll(excludedTables);
        }
        ArrayList<Identifier> results = new ArrayList<Identifier>();
        for (String table : catalog.listTables(database)) {
            Identifier identifier = Identifier.create(database, table);
            if (excludedTableSet.contains(identifier.getFullName()) || !CollectionUtils.isEmpty(includedTableSet) && !includedTableSet.contains(identifier.getFullName())) continue;
            results.add(identifier);
        }
        return results;
    }
}

