/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.net.URI;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.actions.BaseSparkAction;
import org.apache.iceberg.spark.actions.FileInfo;
import org.apache.iceberg.spark.actions.SetAccumulator;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class DeleteOrphanFilesSparkAction
extends BaseSparkAction<DeleteOrphanFilesSparkAction>
implements DeleteOrphanFiles {
    private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
    private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = ImmutableMap.of((Object)"s3n,s3a", (Object)"s3");
    private static final int MAX_DRIVER_LISTING_DEPTH = 3;
    private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10;
    private static final int MAX_EXECUTOR_LISTING_DEPTH = 2000;
    private static final int MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS = Integer.MAX_VALUE;
    private final SerializableConfiguration hadoopConf;
    private final int listingParallelism;
    private final Table table;
    private Map<String, String> equalSchemes = DeleteOrphanFilesSparkAction.flattenMap(EQUAL_SCHEMES_DEFAULT);
    private Map<String, String> equalAuthorities = Collections.emptyMap();
    private DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode = DeleteOrphanFiles.PrefixMismatchMode.ERROR;
    private String location = null;
    private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3L);
    private Dataset<Row> compareToFileList;
    private Consumer<String> deleteFunc = null;
    private ExecutorService deleteExecutorService = null;

    DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
        super(spark);
        this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
        this.listingParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
        this.table = table;
        this.location = table.location();
        ValidationException.check((boolean)PropertyUtil.propertyAsBoolean((Map)table.properties(), (String)"gc.enabled", (boolean)true), (String)"Cannot delete orphan files: GC is disabled (deleting files may corrupt other tables)", (Object[])new Object[0]);
    }

    @Override
    protected DeleteOrphanFilesSparkAction self() {
        return this;
    }

    public DeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
        this.deleteExecutorService = executorService;
        return this;
    }

    public DeleteOrphanFilesSparkAction prefixMismatchMode(DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) {
        this.prefixMismatchMode = newPrefixMismatchMode;
        return this;
    }

    public DeleteOrphanFilesSparkAction equalSchemes(Map<String, String> newEqualSchemes) {
        this.equalSchemes = Maps.newHashMap();
        this.equalSchemes.putAll(DeleteOrphanFilesSparkAction.flattenMap(EQUAL_SCHEMES_DEFAULT));
        this.equalSchemes.putAll(DeleteOrphanFilesSparkAction.flattenMap(newEqualSchemes));
        return this;
    }

    public DeleteOrphanFilesSparkAction equalAuthorities(Map<String, String> newEqualAuthorities) {
        this.equalAuthorities = Maps.newHashMap();
        this.equalAuthorities.putAll(DeleteOrphanFilesSparkAction.flattenMap(newEqualAuthorities));
        return this;
    }

    public DeleteOrphanFilesSparkAction location(String newLocation) {
        this.location = newLocation;
        return this;
    }

    public DeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
        this.olderThanTimestamp = newOlderThanTimestamp;
        return this;
    }

    public DeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
        this.deleteFunc = newDeleteFunc;
        return this;
    }

    public DeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
        StructType schema = files.schema();
        StructField filePathField = schema.apply("file_path");
        Preconditions.checkArgument((filePathField.dataType() == DataTypes.StringType ? 1 : 0) != 0, (String)"Invalid %s column: %s is not a string", (Object)"file_path", (Object)filePathField.dataType());
        StructField lastModifiedField = schema.apply("last_modified");
        Preconditions.checkArgument((lastModifiedField.dataType() == DataTypes.TimestampType ? 1 : 0) != 0, (String)"Invalid %s column: %s is not a timestamp", (Object)"last_modified", (Object)lastModifiedField.dataType());
        this.compareToFileList = files;
        return this;
    }

    private Dataset<String> filteredCompareToFileList() {
        Dataset files = this.compareToFileList;
        if (this.location != null) {
            files = files.filter(files.col("file_path").startsWith(this.location));
        }
        return files.filter(files.col("last_modified").lt((Object)new Timestamp(this.olderThanTimestamp))).select(new Column[]{files.col("file_path")}).as(Encoders.STRING());
    }

    public DeleteOrphanFiles.Result execute() {
        JobGroupInfo info = this.newJobGroupInfo("DELETE-ORPHAN-FILES", this.jobDesc());
        return this.withJobGroupInfo(info, this::doExecute);
    }

    private String jobDesc() {
        ArrayList options = Lists.newArrayList();
        options.add("older_than=" + this.olderThanTimestamp);
        if (this.location != null) {
            options.add("location=" + this.location);
        }
        String optionsAsString = COMMA_JOINER.join((Iterable)options);
        return String.format("Deleting orphan files (%s) from %s", optionsAsString, this.table.name());
    }

    private void deleteFiles(SupportsBulkOperations io, List<String> paths) {
        try {
            io.deleteFiles(paths);
            LOG.info("Deleted {} files using bulk deletes", (Object)paths.size());
        }
        catch (BulkDeletionFailureException e) {
            int deletedFilesCount = paths.size() - e.numberFailedObjects();
            LOG.warn("Deleted only {} of {} files using bulk deletes", (Object)deletedFilesCount, (Object)paths.size());
        }
    }

    private DeleteOrphanFiles.Result doExecute() {
        Dataset<FileURI> actualFileIdentDS = this.actualFileIdentDS();
        Dataset<FileURI> validFileIdentDS = this.validFileIdentDS();
        List<String> orphanFiles = DeleteOrphanFilesSparkAction.findOrphanFiles(this.spark(), actualFileIdentDS, validFileIdentDS, this.prefixMismatchMode);
        if (this.deleteFunc == null && this.table.io() instanceof SupportsBulkOperations) {
            this.deleteFiles((SupportsBulkOperations)this.table.io(), orphanFiles);
        } else {
            Tasks.Builder deleteTasks = Tasks.foreach(orphanFiles).noRetry().executeWith(this.deleteExecutorService).suppressFailureWhenFinished().onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, (Object)exc));
            if (this.deleteFunc == null) {
                LOG.info("Table IO {} does not support bulk operations. Using non-bulk deletes.", (Object)this.table.io().getClass().getName());
                deleteTasks.run(arg_0 -> ((FileIO)this.table.io()).deleteFile(arg_0));
            } else {
                LOG.info("Custom delete function provided. Using non-bulk deletes");
                deleteTasks.run(this.deleteFunc::accept);
            }
        }
        return new BaseDeleteOrphanFilesActionResult(orphanFiles);
    }

    private Dataset<FileURI> validFileIdentDS() {
        FileInfoToFileURI toFileURI = new FileInfoToFileURI(this.equalSchemes, this.equalAuthorities);
        Dataset<FileURI> contentFileIdentDS = toFileURI.apply(this.contentFileDS(this.table));
        Dataset<FileURI> manifestFileIdentDS = toFileURI.apply(this.manifestDS(this.table));
        Dataset<FileURI> manifestListIdentDS = toFileURI.apply(this.manifestListDS(this.table));
        Dataset<FileURI> otherMetadataFileIdentDS = toFileURI.apply(this.otherMetadataFileDS(this.table));
        return contentFileIdentDS.union(manifestFileIdentDS).union(manifestListIdentDS).union(otherMetadataFileIdentDS);
    }

    private Dataset<FileURI> actualFileIdentDS() {
        StringToFileURI toFileURI = new StringToFileURI(this.equalSchemes, this.equalAuthorities);
        if (this.compareToFileList == null) {
            return toFileURI.apply(this.listedFileDS());
        }
        return toFileURI.apply(this.filteredCompareToFileList());
    }

    private Dataset<String> listedFileDS() {
        ArrayList subDirs = Lists.newArrayList();
        ArrayList matchingFiles = Lists.newArrayList();
        Predicate<FileStatus> predicate = file -> file.getModificationTime() < this.olderThanTimestamp;
        PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(this.table.specs());
        DeleteOrphanFilesSparkAction.listDirRecursively(this.location, predicate, this.hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles);
        JavaRDD matchingFileRDD = this.sparkContext().parallelize((List)matchingFiles, 1);
        if (subDirs.isEmpty()) {
            return this.spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
        }
        int parallelism = Math.min(subDirs.size(), this.listingParallelism);
        JavaRDD subDirRDD = this.sparkContext().parallelize((List)subDirs, parallelism);
        Broadcast conf = this.sparkContext().broadcast((Object)this.hadoopConf);
        ListDirsRecursively listDirs = new ListDirsRecursively((Broadcast<SerializableConfiguration>)conf, this.olderThanTimestamp, pathFilter);
        JavaRDD matchingLeafFileRDD = subDirRDD.mapPartitions((FlatMapFunction)listDirs);
        JavaRDD completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
        return this.spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
    }

    private static void listDirRecursively(String dir, Predicate<FileStatus> predicate, Configuration conf, int maxDepth, int maxDirectSubDirs, List<String> remainingSubDirs, PathFilter pathFilter, List<String> matchingFiles) {
        if (maxDepth <= 0) {
            remainingSubDirs.add(dir);
            return;
        }
        try {
            Path path = new Path(dir);
            FileSystem fs = path.getFileSystem(conf);
            ArrayList subDirs = Lists.newArrayList();
            for (FileStatus file : fs.listStatus(path, pathFilter)) {
                if (file.isDirectory()) {
                    subDirs.add(file.getPath().toString());
                    continue;
                }
                if (!file.isFile() || !predicate.test(file)) continue;
                matchingFiles.add(file.getPath().toString());
            }
            if (subDirs.size() > maxDirectSubDirs) {
                remainingSubDirs.addAll(subDirs);
                return;
            }
            for (String subDir : subDirs) {
                DeleteOrphanFilesSparkAction.listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, pathFilter, matchingFiles);
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @VisibleForTesting
    static List<String> findOrphanFiles(SparkSession spark, Dataset<FileURI> actualFileIdentDS, Dataset<FileURI> validFileIdentDS, DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode) {
        SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<Pair<String, String>>();
        spark.sparkContext().register(conflicts);
        Column joinCond = actualFileIdentDS.col("path").equalTo((Object)validFileIdentDS.col("path"));
        List orphanFiles = actualFileIdentDS.joinWith(validFileIdentDS, joinCond, "leftouter").mapPartitions((MapPartitionsFunction)new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING()).collectAsList();
        if (prefixMismatchMode == DeleteOrphanFiles.PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
            throw new ValidationException("Unable to determine whether certain files are orphan. Metadata references files that match listed/provided files except for authority/scheme. Please, inspect the conflicting authorities/schemes and provide which of them are equal by further configuring the action via equalSchemes() and equalAuthorities() methods. Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting authorities/schemes are different. It will be impossible to recover deleted files. Conflicting authorities/schemes: %s.", new Object[]{conflicts.value()});
        }
        return orphanFiles;
    }

    private static Map<String, String> flattenMap(Map<String, String> map) {
        HashMap flattenedMap = Maps.newHashMap();
        if (map != null) {
            for (String key : map.keySet()) {
                String value = map.get(key);
                for (String splitKey : COMMA_SPLITTER.split((CharSequence)key)) {
                    flattenedMap.put(splitKey.trim(), value.trim());
                }
            }
        }
        return flattenedMap;
    }

    public static class FileURI {
        public static final Encoder<FileURI> ENCODER = Encoders.bean(FileURI.class);
        private String scheme;
        private String authority;
        private String path;
        private String uriAsString;

        public FileURI(String scheme, String authority, String path, String uriAsString) {
            this.scheme = scheme;
            this.authority = authority;
            this.path = path;
            this.uriAsString = uriAsString;
        }

        public FileURI() {
        }

        public void setScheme(String scheme) {
            this.scheme = scheme;
        }

        public void setAuthority(String authority) {
            this.authority = authority;
        }

        public void setPath(String path) {
            this.path = path;
        }

        public void setUriAsString(String uriAsString) {
            this.uriAsString = uriAsString;
        }

        public String getScheme() {
            return this.scheme;
        }

        public String getAuthority() {
            return this.authority;
        }

        public String getPath() {
            return this.path;
        }

        public String getUriAsString() {
            return this.uriAsString;
        }
    }

    @VisibleForTesting
    static class PartitionAwareHiddenPathFilter
    implements PathFilter,
    Serializable {
        private final Set<String> hiddenPathPartitionNames;

        PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
            this.hiddenPathPartitionNames = hiddenPathPartitionNames;
        }

        public boolean accept(Path path) {
            return this.isHiddenPartitionPath(path) || HiddenPathFilter.get().accept(path);
        }

        private boolean isHiddenPartitionPath(Path path) {
            return this.hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
        }

        static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
            if (specs == null) {
                return HiddenPathFilter.get();
            }
            Set<String> partitionNames = specs.values().stream().map(PartitionSpec::fields).flatMap(Collection::stream).filter(field -> field.name().startsWith("_") || field.name().startsWith(".")).map(field -> field.name() + "=").collect(Collectors.toSet());
            if (partitionNames.isEmpty()) {
                return HiddenPathFilter.get();
            }
            return new PartitionAwareHiddenPathFilter(partitionNames);
        }
    }

    private static abstract class ToFileURI<I>
    implements MapPartitionsFunction<I, FileURI> {
        private final Map<String, String> equalSchemes;
        private final Map<String, String> equalAuthorities;

        ToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
            this.equalSchemes = equalSchemes;
            this.equalAuthorities = equalAuthorities;
        }

        protected abstract String uriAsString(I var1);

        Dataset<FileURI> apply(Dataset<I> ds) {
            return ds.mapPartitions((MapPartitionsFunction)this, FileURI.ENCODER);
        }

        public Iterator<FileURI> call(Iterator<I> rows) throws Exception {
            return Iterators.transform(rows, this::toFileURI);
        }

        private FileURI toFileURI(I input) {
            String uriAsString = this.uriAsString(input);
            URI uri = new Path(uriAsString).toUri();
            String scheme = this.equalSchemes.getOrDefault(uri.getScheme(), uri.getScheme());
            String authority = this.equalAuthorities.getOrDefault(uri.getAuthority(), uri.getAuthority());
            return new FileURI(scheme, authority, uri.getPath(), uriAsString);
        }
    }

    @VisibleForTesting
    static class FileInfoToFileURI
    extends ToFileURI<FileInfo> {
        FileInfoToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
            super(equalSchemes, equalAuthorities);
        }

        @Override
        protected String uriAsString(FileInfo fileInfo) {
            return fileInfo.getPath();
        }
    }

    @VisibleForTesting
    static class StringToFileURI
    extends ToFileURI<String> {
        StringToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
            super(equalSchemes, equalAuthorities);
        }

        @Override
        protected String uriAsString(String input) {
            return input;
        }
    }

    private static class FindOrphanFiles
    implements MapPartitionsFunction<Tuple2<FileURI, FileURI>, String> {
        private final DeleteOrphanFiles.PrefixMismatchMode mode;
        private final SetAccumulator<Pair<String, String>> conflicts;

        FindOrphanFiles(DeleteOrphanFiles.PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
            this.mode = mode;
            this.conflicts = conflicts;
        }

        public Iterator<String> call(Iterator<Tuple2<FileURI, FileURI>> rows) throws Exception {
            Iterator orphanFiles = Iterators.transform(rows, this::toOrphanFile);
            return Iterators.filter((Iterator)orphanFiles, Objects::nonNull);
        }

        private String toOrphanFile(Tuple2<FileURI, FileURI> row) {
            FileURI actual = (FileURI)row._1;
            FileURI valid = (FileURI)row._2;
            if (valid == null) {
                return actual.uriAsString;
            }
            boolean schemeMatch = this.uriComponentMatch(valid.scheme, actual.scheme);
            boolean authorityMatch = this.uriComponentMatch(valid.authority, actual.authority);
            if (!(schemeMatch && authorityMatch || this.mode != DeleteOrphanFiles.PrefixMismatchMode.DELETE)) {
                return actual.uriAsString;
            }
            if (!schemeMatch) {
                this.conflicts.add((Pair<String, String>)Pair.of((Object)valid.scheme, (Object)actual.scheme));
            }
            if (!authorityMatch) {
                this.conflicts.add((Pair<String, String>)Pair.of((Object)valid.authority, (Object)actual.authority));
            }
            return null;
        }

        private boolean uriComponentMatch(String valid, String actual) {
            return Strings.isNullOrEmpty((String)valid) || valid.equalsIgnoreCase(actual);
        }
    }

    private static class ListDirsRecursively
    implements FlatMapFunction<Iterator<String>, String> {
        private final Broadcast<SerializableConfiguration> hadoopConf;
        private final long olderThanTimestamp;
        private final PathFilter pathFilter;

        ListDirsRecursively(Broadcast<SerializableConfiguration> hadoopConf, long olderThanTimestamp, PathFilter pathFilter) {
            this.hadoopConf = hadoopConf;
            this.olderThanTimestamp = olderThanTimestamp;
            this.pathFilter = pathFilter;
        }

        public Iterator<String> call(Iterator<String> dirs) throws Exception {
            ArrayList subDirs = Lists.newArrayList();
            ArrayList files = Lists.newArrayList();
            Predicate<FileStatus> predicate = file -> file.getModificationTime() < this.olderThanTimestamp;
            while (dirs.hasNext()) {
                DeleteOrphanFilesSparkAction.listDirRecursively(dirs.next(), predicate, ((SerializableConfiguration)this.hadoopConf.value()).value(), 2000, Integer.MAX_VALUE, subDirs, this.pathFilter, files);
            }
            if (!subDirs.isEmpty()) {
                throw new RuntimeException("Could not list sub directories, reached maximum depth: 2000");
            }
            return files.iterator();
        }
    }
}

