/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrphanFilesClean {
    private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesClean.class);
    private static final int READ_FILE_RETRY_NUM = 3;
    private static final int READ_FILE_RETRY_INTERVAL = 5;
    private final SnapshotManager snapshotManager;
    private final TagManager tagManager;
    private final FileIO fileIO;
    private final Path location;
    private final int partitionKeysNum;
    private final ManifestList manifestList;
    private final ManifestFile manifestFile;
    private final IndexFileHandler indexFileHandler;
    private int deletedFilesNum = 0;
    private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1L);

    public OrphanFilesClean(FileStoreTable table) {
        this.snapshotManager = table.snapshotManager();
        this.tagManager = table.tagManager();
        this.fileIO = table.fileIO();
        this.location = table.location();
        this.partitionKeysNum = table.partitionKeys().size();
        FileStore<?> store = table.store();
        this.manifestList = store.manifestListFactory().create();
        this.manifestFile = store.manifestFileFactory().create();
        this.indexFileHandler = store.newIndexFileHandler();
    }

    public OrphanFilesClean olderThan(String timestamp) {
        this.olderThanMillis = DateTimeUtils.parseTimestampData(timestamp, 3, DateTimeUtils.LOCAL_TZ).getMillisecond();
        return this;
    }

    public int clean() throws IOException, ExecutionException, InterruptedException {
        if (this.snapshotManager.earliestSnapshotId() == null) {
            LOG.info("No snapshot found, skip removing.");
            return 0;
        }
        List<Path> nonSnapshotFiles = this.snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
        nonSnapshotFiles.forEach(this::deleteFileOrDirQuietly);
        this.deletedFilesNum += nonSnapshotFiles.size();
        Set<String> usedFiles = this.getUsedFiles();
        Map<String, Path> candidates = this.getCandidateDeletingFiles();
        HashSet<String> deleted = new HashSet<String>(candidates.keySet());
        deleted.removeAll(usedFiles);
        for (String file : deleted) {
            Path path = candidates.get(file);
            this.deleteFileOrDirQuietly(path);
        }
        this.deletedFilesNum += deleted.size();
        return this.deletedFilesNum;
    }

    private Set<String> getUsedFiles() throws IOException, ExecutionException, InterruptedException {
        HashSet<Snapshot> readSnapshots = new HashSet<Snapshot>(this.snapshotManager.safelyGetAllSnapshots());
        List<Snapshot> taggedSnapshots = this.tagManager.taggedSnapshots();
        readSnapshots.addAll(taggedSnapshots);
        return (Set)((ForkJoinTask)FileUtils.COMMON_IO_FORK_JOIN_POOL.submit(() -> readSnapshots.parallelStream().flatMap(snapshot -> this.getUsedFilesForSnapshot((Snapshot)snapshot).stream()).collect(Collectors.toSet()))).get();
    }

    private Map<String, Path> getCandidateDeletingFiles() {
        List<Path> fileDirs = this.listPaimonFileDirs();
        try {
            return (Map)((ForkJoinTask)FileUtils.COMMON_IO_FORK_JOIN_POOL.submit(() -> fileDirs.parallelStream().flatMap(p -> this.tryBestListingDirs((Path)p).stream()).filter(this::oldEnough).map(FileStatus::getPath).collect(Collectors.toMap(Path::getName, Function.identity())))).get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.debug("Failed to get candidate deleting files.", (Throwable)e);
            return Collections.emptyMap();
        }
    }

    private List<String> getUsedFilesForSnapshot(Snapshot snapshot) {
        ArrayList<String> files = new ArrayList<String>();
        this.addManifestList(files, snapshot);
        try {
            List manifestFileMetas = this.retryReadingFiles(() -> this.readAllManifestsWithIOException(snapshot));
            if (manifestFileMetas == null) {
                return Collections.emptyList();
            }
            List<String> manifestFileName = manifestFileMetas.stream().map(ManifestFileMeta::fileName).collect(Collectors.toList());
            files.addAll(manifestFileName);
            List<String> dataFiles = this.retryReadingDataFiles(manifestFileName);
            if (dataFiles == null) {
                return Collections.emptyList();
            }
            files.addAll(dataFiles);
            String indexManifest = snapshot.indexManifest();
            if (indexManifest != null && this.indexFileHandler.existsManifest(indexManifest)) {
                files.add(indexManifest);
                List indexManifestEntries = this.retryReadingFiles(() -> this.indexFileHandler.readManifestWithIOException(indexManifest));
                if (indexManifestEntries == null) {
                    return Collections.emptyList();
                }
                indexManifestEntries.stream().map(IndexManifestEntry::indexFile).map(IndexFileMeta::fileName).forEach(files::add);
            }
            if (snapshot.statistics() != null) {
                files.add(snapshot.statistics());
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return files;
    }

    private void addManifestList(List<String> used, Snapshot snapshot) {
        used.add(snapshot.baseManifestList());
        used.add(snapshot.deltaManifestList());
        String changelogManifestList = snapshot.changelogManifestList();
        if (changelogManifestList != null) {
            used.add(changelogManifestList);
        }
    }

    @Nullable
    private <T> T retryReadingFiles(ReaderWithIOException<T> reader) throws IOException {
        int retryNumber = 0;
        IOException caught = null;
        while (retryNumber++ < 3) {
            try {
                return reader.read();
            }
            catch (FileNotFoundException e) {
                return null;
            }
            catch (IOException e) {
                caught = e;
                try {
                    TimeUnit.MILLISECONDS.sleep(5L);
                }
                catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e2);
                }
            }
        }
        throw caught;
    }

    private List<ManifestFileMeta> readAllManifestsWithIOException(Snapshot snapshot) throws IOException {
        ArrayList<ManifestFileMeta> result = new ArrayList<ManifestFileMeta>();
        result.addAll(this.manifestList.readWithIOException(snapshot.baseManifestList()));
        result.addAll(this.manifestList.readWithIOException(snapshot.deltaManifestList()));
        String changelogManifestList = snapshot.changelogManifestList();
        if (changelogManifestList != null) {
            result.addAll(this.manifestList.readWithIOException(changelogManifestList));
        }
        return result;
    }

    @Nullable
    private List<String> retryReadingDataFiles(List<String> manifestNames) throws IOException {
        ArrayList<String> dataFiles = new ArrayList<String>();
        for (String manifestName : manifestNames) {
            List manifestEntries = this.retryReadingFiles(() -> this.manifestFile.readWithIOException(manifestName));
            if (manifestEntries == null) {
                return null;
            }
            manifestEntries.stream().map(ManifestEntry::file).forEach(f -> {
                dataFiles.add(f.fileName());
                dataFiles.addAll(f.extraFiles());
            });
        }
        return dataFiles;
    }

    private List<Path> listPaimonFileDirs() {
        ArrayList<Path> paimonFileDirs = new ArrayList<Path>();
        paimonFileDirs.add(new Path(this.location, "manifest"));
        paimonFileDirs.add(new Path(this.location, "index"));
        paimonFileDirs.add(new Path(this.location, "statistics"));
        paimonFileDirs.addAll(this.listAndCleanDataDirs(this.location, this.partitionKeysNum));
        return paimonFileDirs;
    }

    private List<FileStatus> tryBestListingDirs(Path dir) {
        try {
            if (!this.fileIO.exists(dir)) {
                return Collections.emptyList();
            }
            List status = this.retryReadingFiles(() -> {
                FileStatus[] s = this.fileIO.listStatus(dir);
                return s == null ? Collections.emptyList() : Arrays.asList(s);
            });
            return status == null ? Collections.emptyList() : status;
        }
        catch (IOException e) {
            LOG.debug("Failed to list directory {}, skip it.", (Object)dir, (Object)e);
            return Collections.emptyList();
        }
    }

    private boolean oldEnough(FileStatus status) {
        return status.getModificationTime() < this.olderThanMillis;
    }

    private List<Path> listAndCleanDataDirs(Path dir, int level) {
        List<FileStatus> dirs = this.tryBestListingDirs(dir);
        if (level == 0) {
            return this.filterAndCleanDataDirs(dirs, p -> p.getName().startsWith("bucket-"), partitionKeysNum -> partitionKeysNum != 0);
        }
        List<Path> partitionPaths = this.filterAndCleanDataDirs(dirs, p -> p.getName().contains("="), partitionKeysNum -> level != partitionKeysNum);
        try {
            return (List)((ForkJoinTask)FileUtils.COMMON_IO_FORK_JOIN_POOL.submit(() -> partitionPaths.parallelStream().flatMap(p -> this.listAndCleanDataDirs((Path)p, level - 1).stream()).collect(Collectors.toList()))).get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.debug("Failed to list partition directory {}", (Object)dir, (Object)e);
            return Collections.emptyList();
        }
    }

    private List<Path> filterAndCleanDataDirs(List<FileStatus> statuses, Predicate<Path> filter, Predicate<Integer> cleanCondition) {
        ArrayList<Path> filtered = new ArrayList<Path>();
        ArrayList<FileStatus> mayBeClean = new ArrayList<FileStatus>();
        for (FileStatus status : statuses) {
            Path path = status.getPath();
            if (filter.test(path)) {
                filtered.add(path);
                continue;
            }
            mayBeClean.add(status);
        }
        if (cleanCondition.test(this.partitionKeysNum)) {
            mayBeClean.stream().filter(this::oldEnough).map(FileStatus::getPath).forEach(p -> {
                this.deleteFileOrDirQuietly((Path)p);
                ++this.deletedFilesNum;
            });
        }
        return filtered;
    }

    private void deleteFileOrDirQuietly(Path path) {
        try {
            if (this.fileIO.isDir(path)) {
                this.fileIO.deleteDirectoryQuietly(path);
            } else {
                this.fileIO.deleteQuietly(path);
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @FunctionalInterface
    private static interface ReaderWithIOException<T> {
        public T read() throws IOException;
    }
}

