/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.procedures;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.procedures.BaseProcedure;
import org.apache.iceberg.spark.procedures.SparkProcedures;
import org.apache.iceberg.util.LocationUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.runtime.BoxedUnit;

class AddFilesProcedure
extends BaseProcedure {
    private static final Joiner.MapJoiner MAP_JOINER = Joiner.on((String)",").withKeyValueSeparator("=");
    private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.required("source_table", DataTypes.StringType), ProcedureParameter.optional("partition_filter", STRING_MAP), ProcedureParameter.optional("check_duplicate_files", DataTypes.BooleanType)};
    private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty())});

    private AddFilesProcedure(TableCatalog tableCatalog) {
        super(tableCatalog);
    }

    public static SparkProcedures.ProcedureBuilder builder() {
        return new BaseProcedure.Builder<AddFilesProcedure>(){

            @Override
            protected AddFilesProcedure doBuild() {
                return new AddFilesProcedure(this.tableCatalog());
            }
        };
    }

    @Override
    public ProcedureParameter[] parameters() {
        return PARAMETERS;
    }

    @Override
    public StructType outputType() {
        return OUTPUT_TYPE;
    }

    @Override
    public InternalRow[] call(InternalRow args) {
        Identifier tableIdent = this.toIdentifier(args.getString(0), PARAMETERS[0].name());
        CatalogPlugin sessionCat = this.spark().sessionState().catalogManager().v2SessionCatalog();
        Identifier sourceIdent = this.toCatalogAndIdentifier(args.getString(1), PARAMETERS[1].name(), sessionCat).identifier();
        HashMap partitionFilter = Maps.newHashMap();
        if (!args.isNullAt(2)) {
            args.getMap(2).foreach(DataTypes.StringType, DataTypes.StringType, (k, v) -> {
                partitionFilter.put(k.toString(), v.toString());
                return BoxedUnit.UNIT;
            });
        }
        boolean checkDuplicateFiles = args.isNullAt(3) ? true : args.getBoolean(3);
        long addedFilesCount = this.importToIceberg(tableIdent, sourceIdent, partitionFilter, checkDuplicateFiles);
        return new InternalRow[]{this.newInternalRow(addedFilesCount)};
    }

    private boolean isFileIdentifier(Identifier ident) {
        String[] namespace = ident.namespace();
        return namespace.length == 1 && (namespace[0].equalsIgnoreCase("orc") || namespace[0].equalsIgnoreCase("parquet") || namespace[0].equalsIgnoreCase("avro"));
    }

    private long importToIceberg(Identifier destIdent, Identifier sourceIdent, Map<String, String> partitionFilter, boolean checkDuplicateFiles) {
        return this.modifyIcebergTable(destIdent, table -> {
            this.validatePartitionSpec((Table)table, partitionFilter);
            AddFilesProcedure.ensureNameMappingPresent(table);
            if (this.isFileIdentifier(sourceIdent)) {
                Path sourcePath = new Path(sourceIdent.name());
                String format = sourceIdent.namespace()[0];
                this.importFileTable((Table)table, sourcePath, format, partitionFilter, checkDuplicateFiles);
            } else {
                this.importCatalogTable((Table)table, sourceIdent, partitionFilter, checkDuplicateFiles);
            }
            Snapshot snapshot = table.currentSnapshot();
            return Long.parseLong(snapshot.summary().getOrDefault("added-data-files", "0"));
        });
    }

    private static void ensureNameMappingPresent(Table table) {
        if (table.properties().get("schema.name-mapping.default") == null) {
            NameMapping mapping = MappingUtil.create((Schema)table.schema());
            String mappingJson = NameMappingParser.toJson((NameMapping)mapping);
            table.updateProperties().set("schema.name-mapping.default", mappingJson).commit();
        }
    }

    private void importFileTable(Table table, Path tableLocation, String format, Map<String, String> partitionFilter, boolean checkDuplicateFiles) {
        List<SparkTableUtil.SparkPartition> partitions = Spark3Util.getPartitions(this.spark(), tableLocation, format, partitionFilter);
        if (table.spec().isUnpartitioned()) {
            Preconditions.checkArgument((boolean)partitions.isEmpty(), (Object)"Cannot add partitioned files to an unpartitioned table");
            Preconditions.checkArgument((boolean)partitionFilter.isEmpty(), (Object)"Cannot use a partition filter when importingto an unpartitioned table");
            SparkTableUtil.SparkPartition partition = new SparkTableUtil.SparkPartition(Collections.emptyMap(), tableLocation.toString(), format);
            this.importPartitions(table, (List<SparkTableUtil.SparkPartition>)ImmutableList.of((Object)partition), checkDuplicateFiles);
        } else {
            Preconditions.checkArgument((!partitions.isEmpty() ? 1 : 0) != 0, (String)"Cannot find any matching partitions in table %s", partitions);
            this.importPartitions(table, partitions, checkDuplicateFiles);
        }
    }

    private void importCatalogTable(Table table, Identifier sourceIdent, Map<String, String> partitionFilter, boolean checkDuplicateFiles) {
        String stagingLocation = this.getMetadataLocation(table);
        TableIdentifier sourceTableIdentifier = Spark3Util.toV1TableIdentifier(sourceIdent);
        SparkTableUtil.importSparkTable(this.spark(), sourceTableIdentifier, table, stagingLocation, partitionFilter, checkDuplicateFiles);
    }

    private void importPartitions(Table table, List<SparkTableUtil.SparkPartition> partitions, boolean checkDuplicateFiles) {
        String stagingLocation = this.getMetadataLocation(table);
        SparkTableUtil.importSparkPartitions(this.spark(), partitions, table, table.spec(), stagingLocation, checkDuplicateFiles);
    }

    private String getMetadataLocation(Table table) {
        String defaultValue = LocationUtil.stripTrailingSlash((String)table.location()) + "/metadata";
        return LocationUtil.stripTrailingSlash((String)table.properties().getOrDefault("write.metadata.path", defaultValue));
    }

    @Override
    public String description() {
        return "AddFiles";
    }

    private void validatePartitionSpec(Table table, Map<String, String> partitionFilter) {
        List partitionFields = table.spec().fields();
        Set partitionNames = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet());
        boolean tablePartitioned = !partitionFields.isEmpty();
        boolean partitionSpecPassed = !partitionFilter.isEmpty();
        List nonIdentityFields = partitionFields.stream().filter(x -> !x.transform().isIdentity()).collect(Collectors.toList());
        Preconditions.checkArgument((boolean)nonIdentityFields.isEmpty(), (String)"Cannot add data files to target table %s because that table is partitioned and contains non-identitypartition transforms which will not be compatible. Found non-identity fields %s", (Object)table.name(), nonIdentityFields);
        if (tablePartitioned && partitionSpecPassed) {
            Preconditions.checkArgument((partitionFields.size() >= partitionFilter.size() ? 1 : 0) != 0, (String)"Cannot add data files to target table %s because that table is partitioned, but the number of columns in the provided partition filter (%s) is greater than the number of partitioned columns in table (%s)", (Object)table.name(), (Object)partitionFilter.size(), (Object)partitionFields.size());
            List unMatchedFilters = partitionFilter.keySet().stream().filter(filterName -> !partitionNames.contains(filterName)).collect(Collectors.toList());
            Preconditions.checkArgument((boolean)unMatchedFilters.isEmpty(), (String)"Cannot add files to target table %s. %s is partitioned but the specified partition filter refers to columns that are not partitioned: '%s' . Valid partition columns %s", (Object)table.name(), (Object)table.name(), unMatchedFilters, (Object)String.join((CharSequence)",", partitionNames));
        } else {
            Preconditions.checkArgument((!partitionSpecPassed ? 1 : 0) != 0, (String)"Cannot use partition filter with an unpartitioned table %s", (Object)table.name());
        }
    }
}

