/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone;

import java.util.ArrayList;
import java.util.Objects;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.clone.DataFileInfo;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.clone.HiveCloneUtils;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloneUtils {
    private static final Logger LOG = LoggerFactory.getLogger(CloneUtils.class);

    public static DataStream<Tuple2<Identifier, Identifier>> buildSource(String sourceDatabase, String sourceTableName, String targetDatabase, String targetTableName, Catalog sourceCatalog, StreamExecutionEnvironment env) throws Exception {
        ArrayList<Tuple2> result = new ArrayList<Tuple2>();
        HiveCatalog hiveCatalog = CloneUtils.getRootHiveCatalog(sourceCatalog);
        if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(sourceTableName), "sourceTableName must be blank when database is null.");
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(targetDatabase), "targetDatabase must be blank when clone all tables in a catalog.");
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(targetTableName), "targetTableName must be blank when clone all tables in a catalog.");
            for (Identifier identifier : HiveCloneUtils.listTables(hiveCatalog)) {
                result.add(new Tuple2((Object)identifier, (Object)identifier));
            }
        } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
            Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(targetDatabase), "targetDatabase must not be blank when clone all tables in a database.");
            Preconditions.checkArgument(StringUtils.isNullOrWhitespaceOnly(targetTableName), "targetTableName must be blank when clone all tables in a catalog.");
            for (Identifier identifier : HiveCloneUtils.listTables(hiveCatalog, sourceDatabase)) {
                result.add(new Tuple2((Object)identifier, (Object)Identifier.create(targetDatabase, identifier.getObjectName())));
            }
        } else {
            Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(targetDatabase), "targetDatabase must not be blank when clone a table.");
            Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(targetTableName), "targetTableName must not be blank when clone a table.");
            result.add(new Tuple2((Object)Identifier.create(sourceDatabase, sourceTableName), (Object)Identifier.create(targetDatabase, targetTableName)));
        }
        Preconditions.checkState(!result.isEmpty(), "Didn't find any table in source catalog.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("The clone identifiers of source table and target table are: {}", result);
        }
        return env.fromCollection(result).forceNonParallel();
    }

    public static HiveCatalog getRootHiveCatalog(Catalog catalog) {
        Catalog rootCatalog = DelegateCatalog.rootCatalog(catalog);
        Preconditions.checkArgument(rootCatalog instanceof HiveCatalog, "Only support HiveCatalog now but found %s.", rootCatalog.getClass().getName());
        return (HiveCatalog)rootCatalog;
    }

    public static class DataFileChannelComputer
    implements ChannelComputer<DataFileInfo> {
        private static final long serialVersionUID = 1L;
        private transient int numChannels;

        @Override
        public void setup(int numChannels) {
            this.numChannels = numChannels;
        }

        @Override
        public int channel(DataFileInfo record) {
            return Math.floorMod(Objects.hash(record.identifier()), this.numChannels);
        }

        public String toString() {
            return "shuffle by identifier hash";
        }
    }

    public static class TableChannelComputer
    implements ChannelComputer<Tuple2<Identifier, Identifier>> {
        private static final long serialVersionUID = 1L;
        private transient int numChannels;

        @Override
        public void setup(int numChannels) {
            this.numChannels = numChannels;
        }

        @Override
        public int channel(Tuple2<Identifier, Identifier> record) {
            return Math.floorMod(Objects.hash(((Identifier)record.f1).getDatabaseName(), ((Identifier)record.f1).getTableName()), this.numChannels);
        }

        public String toString() {
            return "shuffle by identifier hash";
        }
    }
}

