/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.actions.BaseSparkAction;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeleteOrphanFilesSparkAction
extends BaseSparkAction<DeleteOrphanFilesSparkAction>
implements DeleteOrphanFiles {
    private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
    private static final UserDefinedFunction filenameUDF = functions.udf((UDF1 & Serializable)path -> {
        int lastIndex = path.lastIndexOf(File.separator);
        if (lastIndex == -1) {
            return path;
        }
        return path.substring(lastIndex + 1);
    }, (DataType)DataTypes.StringType);
    private final SerializableConfiguration hadoopConf;
    private final int partitionDiscoveryParallelism;
    private final Table table;
    private final Consumer<String> defaultDelete = new Consumer<String>(){

        @Override
        public void accept(String file) {
            DeleteOrphanFilesSparkAction.this.table.io().deleteFile(file);
        }
    };
    private String location = null;
    private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3L);
    private Dataset<Row> compareToFileList;
    private Consumer<String> deleteFunc = this.defaultDelete;
    private ExecutorService deleteExecutorService = null;

    DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
        super(spark);
        this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
        this.partitionDiscoveryParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
        this.table = table;
        this.location = table.location();
        ValidationException.check((boolean)PropertyUtil.propertyAsBoolean((Map)table.properties(), (String)"gc.enabled", (boolean)true), (String)"Cannot delete orphan files: GC is disabled (deleting files may corrupt other tables)", (Object[])new Object[0]);
    }

    @Override
    protected DeleteOrphanFilesSparkAction self() {
        return this;
    }

    public DeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
        this.deleteExecutorService = executorService;
        return this;
    }

    public DeleteOrphanFilesSparkAction location(String newLocation) {
        this.location = newLocation;
        return this;
    }

    public DeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
        this.olderThanTimestamp = newOlderThanTimestamp;
        return this;
    }

    public DeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
        this.deleteFunc = newDeleteFunc;
        return this;
    }

    public DeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
        StructType schema = files.schema();
        StructField filePathField = schema.apply("file_path");
        Preconditions.checkArgument((filePathField.dataType() == DataTypes.StringType ? 1 : 0) != 0, (String)"Invalid %s column: %s is not a string", (Object)"file_path", (Object)filePathField.dataType());
        StructField lastModifiedField = schema.apply("last_modified");
        Preconditions.checkArgument((lastModifiedField.dataType() == DataTypes.TimestampType ? 1 : 0) != 0, (String)"Invalid %s column: %s is not a timestamp", (Object)"last_modified", (Object)lastModifiedField.dataType());
        this.compareToFileList = files;
        return this;
    }

    private Dataset<Row> filteredCompareToFileList() {
        Dataset files = this.compareToFileList;
        if (this.location != null) {
            files = files.filter(files.col("file_path").startsWith(this.location));
        }
        return files.filter(files.col("last_modified").lt((Object)new Timestamp(this.olderThanTimestamp))).select(new Column[]{files.col("file_path")});
    }

    public DeleteOrphanFiles.Result execute() {
        JobGroupInfo info = this.newJobGroupInfo("DELETE-ORPHAN-FILES", this.jobDesc());
        return this.withJobGroupInfo(info, this::doExecute);
    }

    private String jobDesc() {
        ArrayList options = Lists.newArrayList();
        options.add("older_than=" + this.olderThanTimestamp);
        if (this.location != null) {
            options.add("location=" + this.location);
        }
        return String.format("Deleting orphan files (%s) from %s", Joiner.on((char)',').join((Iterable)options), this.table.name());
    }

    private DeleteOrphanFiles.Result doExecute() {
        Dataset<Row> validContentFileDF = this.buildValidContentFileDF(this.table);
        Dataset<Row> validMetadataFileDF = this.buildValidMetadataFileDF(this.table);
        Dataset validFileDF = validContentFileDF.union(validMetadataFileDF);
        Dataset<Row> actualFileDF = this.compareToFileList == null ? this.buildActualFileDF() : this.filteredCompareToFileList();
        Column actualFileName = filenameUDF.apply(new Column[]{actualFileDF.col("file_path")});
        Column validFileName = filenameUDF.apply(new Column[]{validFileDF.col("file_path")});
        Column nameEqual = actualFileName.equalTo((Object)validFileName);
        Column actualContains = actualFileDF.col("file_path").contains((Object)validFileDF.col("file_path"));
        Column joinCond = nameEqual.and(actualContains);
        List orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti").as(Encoders.STRING()).collectAsList();
        Tasks.foreach((Iterable)orphanFiles).noRetry().executeWith(this.deleteExecutorService).suppressFailureWhenFinished().onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, (Object)exc)).run(this.deleteFunc::accept);
        return new BaseDeleteOrphanFilesActionResult((Iterable)orphanFiles);
    }

    private Dataset<Row> buildActualFileDF() {
        ArrayList subDirs = Lists.newArrayList();
        ArrayList matchingFiles = Lists.newArrayList();
        Predicate<FileStatus> predicate = file -> file.getModificationTime() < this.olderThanTimestamp;
        PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(this.table.specs());
        DeleteOrphanFilesSparkAction.listDirRecursively(this.location, predicate, this.hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles);
        JavaRDD matchingFileRDD = this.sparkContext().parallelize((List)matchingFiles, 1);
        if (subDirs.isEmpty()) {
            return this.spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF(new String[]{"file_path"});
        }
        int parallelism = Math.min(subDirs.size(), this.partitionDiscoveryParallelism);
        JavaRDD subDirRDD = this.sparkContext().parallelize((List)subDirs, parallelism);
        Broadcast conf = this.sparkContext().broadcast((Object)this.hadoopConf);
        JavaRDD matchingLeafFileRDD = subDirRDD.mapPartitions(DeleteOrphanFilesSparkAction.listDirsRecursively((Broadcast<SerializableConfiguration>)conf, this.olderThanTimestamp, pathFilter));
        JavaRDD completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
        return this.spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF(new String[]{"file_path"});
    }

    private static void listDirRecursively(String dir, Predicate<FileStatus> predicate, Configuration conf, int maxDepth, int maxDirectSubDirs, List<String> remainingSubDirs, PathFilter pathFilter, List<String> matchingFiles) {
        if (maxDepth <= 0) {
            remainingSubDirs.add(dir);
            return;
        }
        try {
            Path path = new Path(dir);
            FileSystem fs = path.getFileSystem(conf);
            ArrayList subDirs = Lists.newArrayList();
            for (FileStatus file : fs.listStatus(path, pathFilter)) {
                if (file.isDirectory()) {
                    subDirs.add(file.getPath().toString());
                    continue;
                }
                if (!file.isFile() || !predicate.test(file)) continue;
                matchingFiles.add(file.getPath().toString());
            }
            if (subDirs.size() > maxDirectSubDirs) {
                remainingSubDirs.addAll(subDirs);
                return;
            }
            for (String subDir : subDirs) {
                DeleteOrphanFilesSparkAction.listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, pathFilter, matchingFiles);
            }
        }
        catch (IOException e) {
            throw new RuntimeIOException(e);
        }
    }

    private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(Broadcast<SerializableConfiguration> conf, long olderThanTimestamp, PathFilter pathFilter) {
        return (FlatMapFunction & Serializable)dirs -> {
            ArrayList subDirs = Lists.newArrayList();
            ArrayList files = Lists.newArrayList();
            Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
            int maxDepth = 2000;
            int maxDirectSubDirs = Integer.MAX_VALUE;
            dirs.forEachRemaining(dir -> DeleteOrphanFilesSparkAction.listDirRecursively(dir, predicate, ((SerializableConfiguration)conf.value()).value(), maxDepth, maxDirectSubDirs, subDirs, pathFilter, files));
            if (!subDirs.isEmpty()) {
                throw new RuntimeException("Could not list subdirectories, reached maximum subdirectory depth: " + maxDepth);
            }
            return files.iterator();
        };
    }

    @VisibleForTesting
    static class PartitionAwareHiddenPathFilter
    implements PathFilter,
    Serializable {
        private final Set<String> hiddenPathPartitionNames;

        PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
            this.hiddenPathPartitionNames = hiddenPathPartitionNames;
        }

        public boolean accept(Path path) {
            boolean isHiddenPartitionPath = this.hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
            return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
        }

        static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
            if (specs == null) {
                return HiddenPathFilter.get();
            }
            Set<String> partitionNames = specs.values().stream().map(PartitionSpec::fields).flatMap(Collection::stream).filter(partitionField -> partitionField.name().startsWith("_") || partitionField.name().startsWith(".")).map(partitionField -> partitionField.name() + "=").collect(Collectors.toSet());
            return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
        }
    }
}

