/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.clone.CloneFileInfo;
import org.apache.paimon.flink.clone.CloneProcessFunction;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.hive.clone.HiveCloneUtils;
import org.apache.paimon.hive.clone.HivePartitionFiles;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;

public class ListCloneFilesFunction
extends CloneProcessFunction<Tuple2<Identifier, Identifier>, CloneFileInfo> {
    private static final long serialVersionUID = 1L;
    @Nullable
    private final String whereSql;

    public ListCloneFilesFunction(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig, @Nullable String whereSql) {
        super(sourceCatalogConfig, targetCatalogConfig);
        this.whereSql = whereSql;
    }

    public void processElement(Tuple2<Identifier, Identifier> tuple, ProcessFunction.Context context, Collector<CloneFileInfo> collector) throws Exception {
        String sourceType = (String)this.sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
        Preconditions.checkNotNull(sourceType);
        Map<String, String> databaseOptions = HiveCloneUtils.getDatabaseOptions(this.hiveCatalog, ((Identifier)tuple.f0).getDatabaseName());
        this.targetCatalog.createDatabase(((Identifier)tuple.f1).getDatabaseName(), true, databaseOptions);
        Schema schema = HiveCloneUtils.hiveTableToPaimonSchema(this.hiveCatalog, (Identifier)tuple.f0);
        Map<String, String> options = schema.options();
        options.put(CoreOptions.BUCKET.key(), "-1");
        schema = new Schema(schema.fields(), schema.partitionKeys(), schema.primaryKeys(), options, schema.comment());
        try {
            Table existedTable = this.targetCatalog.getTable((Identifier)tuple.f1);
            Preconditions.checkState(existedTable instanceof FileStoreTable, String.format("existed paimon table '%s' is not a FileStoreTable, but a %s", tuple.f1, existedTable.getClass().getName()));
            this.checkCompatible(schema, (FileStoreTable)existedTable);
            LOG.info("paimon table '{}' already exists, use it as target table.", tuple.f1);
        }
        catch (Catalog.TableNotExistException e) {
            LOG.info("create target paimon table '{}'.", tuple.f1);
            this.targetCatalog.createTable((Identifier)tuple.f1, schema, false);
        }
        FileStoreTable table = (FileStoreTable)this.targetCatalog.getTable((Identifier)tuple.f1);
        PartitionPredicate predicate = ListCloneFilesFunction.getPartitionPredicate(this.whereSql, table.schema().logicalPartitionType(), (Identifier)tuple.f0);
        try {
            List<HivePartitionFiles> allPartitions = HiveCloneUtils.listFiles(this.hiveCatalog, (Identifier)tuple.f0, table.schema().logicalPartitionType(), table.coreOptions().partitionDefaultName(), predicate);
            for (HivePartitionFiles partitionFiles : allPartitions) {
                CloneFileInfo.fromHive((Identifier)tuple.f1, partitionFiles).forEach(arg_0 -> collector.collect(arg_0));
            }
        }
        catch (Exception e) {
            throw new Exception("Failed to list clone files for table " + ((Identifier)tuple.f0).getFullName(), e);
        }
    }

    private void checkCompatible(Schema sourceSchema, FileStoreTable existedTable) {
        Schema existedSchema = existedTable.schema().toSchema();
        Preconditions.checkState(existedSchema.primaryKeys().isEmpty(), "Can not clone data to existed paimon table which has primary keys. Existed paimon table is " + existedTable.name());
        Preconditions.checkState(existedTable.coreOptions().bucket() == -1, "Can not clone data to existed paimon table which bucket is not -1. Existed paimon table is " + existedTable.name());
        Preconditions.checkState(Objects.equals(sourceSchema.options().get(CoreOptions.FILE_FORMAT.key()), existedTable.coreOptions().formatType()), "source table format is not compatible with existed paimon table format.");
        List<String> sourcePartitionFields = sourceSchema.partitionKeys();
        List<String> existedPartitionFields = existedSchema.partitionKeys();
        Preconditions.checkState(sourcePartitionFields.size() == existedPartitionFields.size() && new HashSet<String>(existedPartitionFields).containsAll(sourcePartitionFields), "source table partition keys is not compatible with existed paimon table partition keys.");
        List<DataField> sourceFields = sourceSchema.fields();
        List<DataField> existedFields = existedSchema.fields();
        Preconditions.checkState(existedFields.size() >= sourceFields.size() && new HashSet<String>(existedPartitionFields).containsAll(sourcePartitionFields), "source table partition keys is not compatible with existed paimon table partition keys.");
    }

    @Nullable
    @VisibleForTesting
    static PartitionPredicate getPartitionPredicate(@Nullable String whereSql, RowType partitionType, Identifier tableId) throws Exception {
        if (whereSql == null) {
            return null;
        }
        SimpleSqlPredicateConvertor simpleSqlPredicateConvertor = new SimpleSqlPredicateConvertor(partitionType);
        try {
            Predicate predicate = simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
            return PartitionPredicate.fromPredicate(partitionType, predicate);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to parse partition filter sql '" + whereSql + "' for table " + tableId.getFullName(), e);
        }
    }
}

