/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone.schema;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.clone.CloneHiveTableUtils;
import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.clone.HiveCloneUtils;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloneHiveSchemaFunction
extends ProcessFunction<Tuple2<Identifier, Identifier>, CloneSchemaInfo> {
    private static final long serialVersionUID = 1L;
    protected static final Logger LOG = LoggerFactory.getLogger(CloneHiveSchemaFunction.class);
    protected final Map<String, String> sourceCatalogConfig;
    protected final Map<String, String> targetCatalogConfig;
    protected transient HiveCatalog hiveCatalog;
    protected transient Catalog targetCatalog;

    public CloneHiveSchemaFunction(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
        this.sourceCatalogConfig = sourceCatalogConfig;
        this.targetCatalogConfig = targetCatalogConfig;
    }

    public void open(OpenContext openContext) throws Exception {
        this.open(new Configuration());
    }

    public void open(Configuration conf) throws Exception {
        this.hiveCatalog = CloneHiveTableUtils.getRootHiveCatalog(FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig)));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
    }

    public void processElement(Tuple2<Identifier, Identifier> tuple, ProcessFunction.Context context, Collector<CloneSchemaInfo> collector) throws Exception {
        String sourceType = this.sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
        Preconditions.checkNotNull(sourceType);
        Map<String, String> databaseOptions = HiveCloneUtils.getDatabaseOptions(this.hiveCatalog, ((Identifier)tuple.f0).getDatabaseName());
        this.targetCatalog.createDatabase(((Identifier)tuple.f1).getDatabaseName(), true, databaseOptions);
        Schema schema = HiveCloneUtils.hiveTableToPaimonSchema(this.hiveCatalog, (Identifier)tuple.f0);
        Map<String, String> options = schema.options();
        boolean supportCloneSplits = Boolean.parseBoolean(options.get("support.clone.splits"));
        options.remove("support.clone.splits");
        options.put(CoreOptions.BUCKET.key(), "-1");
        schema = new Schema(schema.fields(), schema.partitionKeys(), schema.primaryKeys(), options, schema.comment());
        try {
            Table existedTable = this.targetCatalog.getTable((Identifier)tuple.f1);
            Preconditions.checkState(existedTable instanceof FileStoreTable, String.format("existed paimon table '%s' is not a FileStoreTable, but a %s", tuple.f1, existedTable.getClass().getName()));
            this.checkCompatible(schema, (FileStoreTable)existedTable);
            LOG.info("paimon table '{}' already exists, use it as target table.", tuple.f1);
        }
        catch (Catalog.TableNotExistException e) {
            LOG.info("create target paimon table '{}'.", tuple.f1);
            this.targetCatalog.createTable((Identifier)tuple.f1, schema, false);
        }
        CloneSchemaInfo schemaInfo = new CloneSchemaInfo(tuple, supportCloneSplits);
        collector.collect((Object)schemaInfo);
    }

    private void checkCompatible(Schema sourceSchema, FileStoreTable existedTable) {
        Schema existedSchema = existedTable.schema().toSchema();
        Preconditions.checkState(existedSchema.primaryKeys().isEmpty(), "Can not clone data to existed paimon table which has primary keys. Existed paimon table is " + existedTable.name());
        Preconditions.checkState(existedTable.coreOptions().bucket() == -1, "Can not clone data to existed paimon table which bucket is not -1. Existed paimon table is " + existedTable.name());
        Preconditions.checkState(Objects.equals(sourceSchema.options().get(CoreOptions.FILE_FORMAT.key()), existedTable.coreOptions().formatType()), "source table format is not compatible with existed paimon table format.");
        List<String> sourcePartitionFields = sourceSchema.partitionKeys();
        List<String> existedPartitionFields = existedSchema.partitionKeys();
        Preconditions.checkState(sourcePartitionFields.size() == existedPartitionFields.size() && new HashSet<String>(existedPartitionFields).containsAll(sourcePartitionFields), "source table partition keys is not compatible with existed paimon table partition keys.");
        List<DataField> sourceFields = sourceSchema.fields();
        List<DataField> existedFields = existedSchema.fields();
        Preconditions.checkState(existedFields.size() >= sourceFields.size() && new HashSet<String>(existedPartitionFields).containsAll(sourcePartitionFields), "source table partition keys is not compatible with existed paimon table partition keys.");
    }
}

