/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.clone.ShuffleIdentifierByTableComputer;
import org.apache.paimon.flink.clone.files.CloneFilesCommitOperator;
import org.apache.paimon.flink.clone.files.CloneFilesFunction;
import org.apache.paimon.flink.clone.files.DataFileInfo;
import org.apache.paimon.flink.clone.files.ListCloneFilesFunction;
import org.apache.paimon.flink.clone.files.ShuffleDataFileByTableComputer;
import org.apache.paimon.flink.clone.schema.CloneHiveSchemaFunction;
import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
import org.apache.paimon.flink.clone.spits.CloneSplitsFunction;
import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
import org.apache.paimon.flink.clone.spits.CommitMessageTableOperator;
import org.apache.paimon.flink.clone.spits.ListCloneSplitsFunction;
import org.apache.paimon.flink.clone.spits.ShuffleCommitMessageByTableComputer;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.clone.HiveCloneUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloneHiveTableUtils {
    private static final Logger LOG = LoggerFactory.getLogger(CloneHiveTableUtils.class);

    public static DataStream<Tuple2<Identifier, Identifier>> buildSource(String sourceDatabase, String sourceTableName, String targetDatabase, String targetTableName, Catalog sourceCatalog, @Nullable List<String> includedTables, @Nullable List<String> excludedTables, StreamExecutionEnvironment env) throws Exception {
        ArrayList<Tuple2> result = new ArrayList<Tuple2>();
        HiveCatalog hiveCatalog = CloneHiveTableUtils.getRootHiveCatalog(sourceCatalog);
        if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(sourceTableName), "sourceTableName must be blank when database is null.");
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(targetDatabase), "targetDatabase must be blank when clone all tables in a catalog.");
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(targetTableName), "targetTableName must be blank when clone all tables in a catalog.");
            for (Identifier identifier : HiveCloneUtils.listTables(hiveCatalog, includedTables, excludedTables)) {
                result.add(new Tuple2((Object)identifier, (Object)identifier));
            }
        } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
            Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(targetDatabase), "targetDatabase must not be blank when clone all tables in a database.");
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(targetTableName), "targetTableName must be blank when clone all tables in a catalog.");
            for (Identifier identifier : HiveCloneUtils.listTables(hiveCatalog, sourceDatabase, includedTables, excludedTables)) {
                result.add(new Tuple2((Object)identifier, (Object)Identifier.create(targetDatabase, identifier.getObjectName())));
            }
        } else {
            Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(targetDatabase), "targetDatabase must not be blank when clone a table.");
            Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(targetTableName), "targetTableName must not be blank when clone a table.");
            Preconditions.checkArgument(CollectionUtils.isEmpty(includedTables), "includedTables must be empty when clone a single table.");
            Preconditions.checkArgument(CollectionUtils.isEmpty(excludedTables), "excludedTables must be empty when clone a single table.");
            result.add(new Tuple2((Object)Identifier.create(sourceDatabase, sourceTableName), (Object)Identifier.create(targetDatabase, targetTableName)));
        }
        Preconditions.checkState(!result.isEmpty(), "Didn't find any table in source catalog.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("The clone identifiers of source table and target table are: {}", result);
        }
        return env.fromCollection(result).forceNonParallel();
    }

    public static HiveCatalog getRootHiveCatalog(Catalog catalog) {
        Catalog rootCatalog = DelegateCatalog.rootCatalog(catalog);
        Preconditions.checkArgument(rootCatalog instanceof HiveCatalog, "Only support HiveCatalog now but found %s.", rootCatalog.getClass().getName());
        return (HiveCatalog)rootCatalog;
    }

    public static void build(StreamExecutionEnvironment env, Catalog sourceCatalog, String sourceDatabase, String sourceTableName, Map<String, String> sourceCatalogConfig, String targetDatabase, String targetTableName, Map<String, String> targetCatalogConfig, int parallelism, @Nullable String whereSql, @Nullable List<String> includedTables, @Nullable List<String> excludedTables) throws Exception {
        DataStream<Tuple2<Identifier, Identifier>> source = CloneHiveTableUtils.buildSource(sourceDatabase, sourceTableName, targetDatabase, targetTableName, sourceCatalog, includedTables, excludedTables, env);
        DataStream<Tuple2<Identifier, Identifier>> partitionedSource = FlinkStreamPartitioner.partition(source, new ShuffleIdentifierByTableComputer(), parallelism);
        SingleOutputStreamOperator schemaInfos = partitionedSource.process((ProcessFunction)new CloneHiveSchemaFunction(sourceCatalogConfig, targetCatalogConfig)).name("Clone Schema").setParallelism(parallelism);
        CloneHiveTableUtils.buildForCloneSplits(sourceCatalogConfig, targetCatalogConfig, parallelism, whereSql, (DataStream<CloneSchemaInfo>)schemaInfos);
        CloneHiveTableUtils.buildForCloneFile(sourceCatalogConfig, targetCatalogConfig, parallelism, whereSql, (DataStream<CloneSchemaInfo>)schemaInfos);
    }

    public static void buildForCloneSplits(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig, int parallelism, @Nullable String whereSql, DataStream<CloneSchemaInfo> schemaInfos) {
        SingleOutputStreamOperator splits = schemaInfos.filter((FilterFunction & Serializable)cloneSchemaInfo -> cloneSchemaInfo.supportCloneSplits()).rebalance().process((ProcessFunction)new ListCloneSplitsFunction(sourceCatalogConfig, targetCatalogConfig, whereSql)).name("List Splits").setParallelism(parallelism);
        SingleOutputStreamOperator commitMessage = splits.rebalance().process((ProcessFunction)new CloneSplitsFunction(sourceCatalogConfig, targetCatalogConfig)).name("Copy Splits").setParallelism(parallelism);
        DataStream<CommitMessageInfo> partitionedCommitMessage = FlinkStreamPartitioner.partition(commitMessage, new ShuffleCommitMessageByTableComputer(), parallelism);
        SingleOutputStreamOperator committed = partitionedCommitMessage.transform("Commit Table Splits", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (OneInputStreamOperator)new CommitMessageTableOperator(targetCatalogConfig)).setParallelism(parallelism);
        committed.sinkTo((Sink)new DiscardingSink()).name("end").setParallelism(1);
    }

    public static void buildForCloneFile(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig, int parallelism, @Nullable String whereSql, DataStream<CloneSchemaInfo> schemaInfos) {
        SingleOutputStreamOperator files = schemaInfos.filter((FilterFunction & Serializable)cloneSchemaInfo -> !cloneSchemaInfo.supportCloneSplits()).rebalance().process((ProcessFunction)new ListCloneFilesFunction(sourceCatalogConfig, targetCatalogConfig, whereSql)).name("List Files").setParallelism(parallelism);
        SingleOutputStreamOperator dataFile = files.rebalance().process((ProcessFunction)new CloneFilesFunction(sourceCatalogConfig, targetCatalogConfig)).name("Copy Files").setParallelism(parallelism);
        DataStream<DataFileInfo> partitionedDataFile = FlinkStreamPartitioner.partition(dataFile, new ShuffleDataFileByTableComputer(), parallelism);
        SingleOutputStreamOperator committed = partitionedDataFile.transform("Commit Table Files", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (OneInputStreamOperator)new CloneFilesCommitOperator(targetCatalogConfig)).setParallelism(parallelism);
        committed.sinkTo((Sink)new DiscardingSink()).name("end").setParallelism(1);
    }
}

