/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.utils;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;

public class SnapshotManager
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final String SNAPSHOT_PREFIX = "snapshot-";
    private static final String CHANGELOG_PREFIX = "changelog-";
    public static final String EARLIEST = "EARLIEST";
    public static final String LATEST = "LATEST";
    private static final int READ_HINT_RETRY_NUM = 3;
    private static final int READ_HINT_RETRY_INTERVAL = 1;
    private final FileIO fileIO;
    private final Path tablePath;

    public SnapshotManager(FileIO fileIO, Path tablePath) {
        this.fileIO = fileIO;
        this.tablePath = tablePath;
    }

    public FileIO fileIO() {
        return this.fileIO;
    }

    public Path tablePath() {
        return this.tablePath;
    }

    public Path snapshotDirectory() {
        return new Path(this.tablePath + "/snapshot");
    }

    public Path changelogDirectory() {
        return new Path(this.tablePath + "/changelog");
    }

    public Path longLivedChangelogPath(long snapshotId) {
        return new Path(this.tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId);
    }

    public Path snapshotPath(long snapshotId) {
        return new Path(this.tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
    }

    public Path branchSnapshotDirectory(String branchName) {
        return new Path(BranchManager.getBranchPath(this.tablePath, branchName) + "/snapshot");
    }

    public Path branchSnapshotPath(String branchName, long snapshotId) {
        return new Path(BranchManager.getBranchPath(this.tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
    }

    public Path snapshotPathByBranch(String branchName, long snapshotId) {
        return branchName.equals("main") ? this.snapshotPath(snapshotId) : this.branchSnapshotPath(branchName, snapshotId);
    }

    public Path snapshotDirByBranch(String branchName) {
        return branchName.equals("main") ? this.snapshotDirectory() : this.branchSnapshotDirectory(branchName);
    }

    public Snapshot snapshot(long snapshotId) {
        return this.snapshot("main", snapshotId);
    }

    public Changelog changelog(long snapshotId) {
        Path changelogPath = this.longLivedChangelogPath(snapshotId);
        return Changelog.fromPath(this.fileIO, changelogPath);
    }

    public Changelog longLivedChangelog(long snapshotId) {
        return Changelog.fromPath(this.fileIO, this.longLivedChangelogPath(snapshotId));
    }

    public Snapshot snapshot(String branchName, long snapshotId) {
        Path snapshotPath = this.snapshotPathByBranch(branchName, snapshotId);
        return Snapshot.fromPath(this.fileIO, snapshotPath);
    }

    public boolean snapshotExists(long snapshotId) {
        Path path = this.snapshotPath(snapshotId);
        try {
            return this.fileIO.exists(path);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to determine if snapshot #" + snapshotId + " exists in path " + path, e);
        }
    }

    public boolean longLivedChangelogExists(long snapshotId) {
        Path path = this.longLivedChangelogPath(snapshotId);
        try {
            return this.fileIO.exists(path);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to determine if changelog #" + snapshotId + " exists in path " + path, e);
        }
    }

    @Nullable
    public Snapshot latestSnapshot() {
        return this.latestSnapshot("main");
    }

    @Nullable
    public Snapshot latestSnapshot(String branchName) {
        Long snapshotId = this.latestSnapshotId(branchName);
        return snapshotId == null ? null : this.snapshot(branchName, snapshotId);
    }

    @Nullable
    public Long latestSnapshotId() {
        return this.latestSnapshotId("main");
    }

    @Nullable
    public Long latestSnapshotId(String branchName) {
        try {
            return this.findLatest(this.snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find latest snapshot id", e);
        }
    }

    @Nullable
    public Snapshot earliestSnapshot() {
        Long snapshotId = this.earliestSnapshotId();
        return snapshotId == null ? null : this.snapshot(snapshotId);
    }

    @Nullable
    public Long earliestSnapshotId() {
        return this.earliestSnapshotId("main");
    }

    @Nullable
    public Long earliestLongLivedChangelogId() {
        try {
            return this.findEarliest(this.changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find earliest changelog id", e);
        }
    }

    @Nullable
    public Long latestLongLivedChangelogId() {
        try {
            return this.findLatest(this.changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find latest changelog id", e);
        }
    }

    @Nullable
    public Long latestChangelogId() {
        return this.latestSnapshotId();
    }

    @Nullable
    public Long earliestSnapshotId(String branchName) {
        try {
            return this.findEarliest(this.snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find earliest snapshot id", e);
        }
    }

    @Nullable
    public Long pickOrLatest(Predicate<Snapshot> predicate) {
        Long latestId = this.latestSnapshotId();
        Long earliestId = this.earliestSnapshotId();
        if (latestId == null || earliestId == null) {
            return null;
        }
        for (long snapshotId = latestId.longValue(); snapshotId >= earliestId; --snapshotId) {
            Snapshot snapshot;
            if (!this.snapshotExists(snapshotId) || !predicate.test(snapshot = this.snapshot(snapshotId))) continue;
            return snapshot.id();
        }
        return latestId;
    }

    private Snapshot changelogOrSnapshot(long snapshotId) {
        if (this.longLivedChangelogExists(snapshotId)) {
            return this.changelog(snapshotId);
        }
        return this.snapshot(snapshotId);
    }

    @Nullable
    public Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
        Long earliestChangelog;
        Long earliestSnapshot = this.earliestSnapshotId();
        Long earliest = startFromChangelog ? ((earliestChangelog = this.earliestLongLivedChangelogId()) == null ? earliestSnapshot : earliestChangelog) : earliestSnapshot;
        Long latest = this.latestSnapshotId();
        if (earliest == null || latest == null) {
            return null;
        }
        if (this.changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
            return earliest - 1L;
        }
        while (earliest < latest) {
            long mid = (earliest + latest + 1L) / 2L;
            if (this.changelogOrSnapshot(mid).timeMillis() < timestampMills) {
                earliest = mid;
                continue;
            }
            latest = mid - 1L;
        }
        return earliest;
    }

    @Nullable
    public Snapshot earlierOrEqualTimeMills(long timestampMills) {
        Long earliest = this.earliestSnapshotId();
        Long latest = this.latestSnapshotId();
        if (earliest == null || latest == null) {
            return null;
        }
        if (this.snapshot(earliest).timeMillis() > timestampMills) {
            return null;
        }
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            long commitTime = snapshot.timeMillis();
            if (commitTime > timestampMills) {
                latest = mid - 1L;
                continue;
            }
            if (commitTime < timestampMills) {
                earliest = mid + 1L;
                finalSnapshot = snapshot;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    @Nullable
    public Snapshot laterOrEqualWatermark(long watermark) {
        Long earliest = this.earliestSnapshotId();
        Long latest = this.latestSnapshotId();
        if (earliest == null || latest == null) {
            return null;
        }
        Long earliestWatermark = null;
        earliestWatermark = this.snapshot(earliest).watermark();
        if (earliestWatermark == null) {
            while (earliest < latest) {
                Long l = earliest;
                Long l2 = earliest = Long.valueOf(earliest + 1L);
                earliestWatermark = this.snapshot(earliest).watermark();
                if (earliestWatermark == null) continue;
            }
        }
        if (earliestWatermark == null) {
            return null;
        }
        if (earliestWatermark >= watermark) {
            return this.snapshot(earliest);
        }
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            Long commitWatermark = snapshot.watermark();
            if (commitWatermark == null) {
                while (mid >= earliest && (commitWatermark = this.snapshot(--mid).watermark()) == null) {
                }
            }
            if (commitWatermark == null) {
                earliest = mid + 1L;
                continue;
            }
            if (commitWatermark > watermark) {
                latest = mid - 1L;
                finalSnapshot = snapshot;
                continue;
            }
            if (commitWatermark < watermark) {
                earliest = mid + 1L;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    public long snapshotCount() throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, this.snapshotDirectory(), SNAPSHOT_PREFIX).count();
    }

    public Iterator<Snapshot> snapshots() throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, this.snapshotDirectory(), SNAPSHOT_PREFIX).map(this::snapshot).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public Iterator<Snapshot> snapshotsWithinRange(Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) throws IOException {
        Long lowerBoundSnapshotId = this.earliestSnapshotId();
        Long upperBoundSnapshotId = this.latestSnapshotId();
        if (lowerBoundSnapshotId == null || upperBoundSnapshotId == null) {
            return Collections.emptyIterator();
        }
        if (optionalMaxSnapshotId.isPresent()) {
            upperBoundSnapshotId = optionalMaxSnapshotId.get();
        }
        if (optionalMinSnapshotId.isPresent()) {
            lowerBoundSnapshotId = optionalMinSnapshotId.get();
        }
        return LongStream.range(lowerBoundSnapshotId, upperBoundSnapshotId + 1L).mapToObj(this::snapshot).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public Iterator<Changelog> changelogs() throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, this.changelogDirectory(), CHANGELOG_PREFIX).map(this::changelog).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public List<Snapshot> safelyGetAllSnapshots() throws IOException {
        List paths = FileUtils.listVersionedFiles(this.fileIO, this.snapshotDirectory(), SNAPSHOT_PREFIX).map(this::snapshotPath).collect(Collectors.toList());
        ArrayList<Snapshot> snapshots = new ArrayList<Snapshot>();
        for (Path path : paths) {
            Snapshot snapshot = Snapshot.safelyFromPath(this.fileIO, path);
            if (snapshot == null) continue;
            snapshots.add(snapshot);
        }
        return snapshots;
    }

    public List<Changelog> safelyGetAllChangelogs() throws IOException {
        List paths = FileUtils.listVersionedFiles(this.fileIO, this.changelogDirectory(), CHANGELOG_PREFIX).map(this::longLivedChangelogPath).collect(Collectors.toList());
        ArrayList<Changelog> changelogs = new ArrayList<Changelog>();
        for (Path path : paths) {
            try {
                String json = this.fileIO.readFileUtf8(path);
                changelogs.add(Changelog.fromJson(json));
            }
            catch (FileNotFoundException fileNotFoundException) {}
        }
        return changelogs;
    }

    public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> fileStatusFilter) {
        return this.listPathWithFilter(this.snapshotDirectory(), fileStatusFilter, this.nonSnapshotFileFilter());
    }

    public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus> fileStatusFilter) {
        return this.listPathWithFilter(this.changelogDirectory(), fileStatusFilter, this.nonChangelogFileFilter());
    }

    private List<Path> listPathWithFilter(Path directory, Predicate<FileStatus> fileStatusFilter, Predicate<Path> fileFilter) {
        try {
            FileStatus[] statuses = this.fileIO.listStatus(directory);
            if (statuses == null) {
                return Collections.emptyList();
            }
            return Arrays.stream(statuses).filter(fileStatusFilter).map(FileStatus::getPath).filter(fileFilter).collect(Collectors.toList());
        }
        catch (IOException ignored) {
            return Collections.emptyList();
        }
    }

    private Predicate<Path> nonSnapshotFileFilter() {
        return path -> {
            String name = path.getName();
            return !name.startsWith(SNAPSHOT_PREFIX) && !name.equals(EARLIEST) && !name.equals(LATEST);
        };
    }

    private Predicate<Path> nonChangelogFileFilter() {
        return path -> {
            String name = path.getName();
            return !name.startsWith(CHANGELOG_PREFIX) && !name.equals(EARLIEST) && !name.equals(LATEST);
        };
    }

    public Optional<Snapshot> latestSnapshotOfUser(String user) {
        Long latestId = this.latestSnapshotId();
        if (latestId == null) {
            return Optional.empty();
        }
        long earliestId = Preconditions.checkNotNull(this.earliestSnapshotId(), "Latest snapshot id is not null, but earliest snapshot id is null. This is unexpected.");
        for (long id = latestId.longValue(); id >= earliestId; --id) {
            Snapshot snapshot = this.snapshot(id);
            if (!user.equals(snapshot.commitUser())) continue;
            return Optional.of(snapshot);
        }
        return Optional.empty();
    }

    public List<Snapshot> findSnapshotsForIdentifiers(@Nonnull String user, List<Long> identifiers) {
        if (identifiers.isEmpty()) {
            return Collections.emptyList();
        }
        Long latestId = this.latestSnapshotId();
        if (latestId == null) {
            return Collections.emptyList();
        }
        long earliestId = Preconditions.checkNotNull(this.earliestSnapshotId(), "Latest snapshot id is not null, but earliest snapshot id is null. This is unexpected.");
        long minSearchedIdentifier = (Long)identifiers.stream().min(Long::compareTo).get();
        ArrayList<Snapshot> matchedSnapshots = new ArrayList<Snapshot>();
        HashSet<Long> remainingIdentifiers = new HashSet<Long>(identifiers);
        for (long id = latestId.longValue(); id >= earliestId && !remainingIdentifiers.isEmpty(); --id) {
            Snapshot snapshot = this.snapshot(id);
            if (!user.equals(snapshot.commitUser())) continue;
            if (remainingIdentifiers.remove(snapshot.commitIdentifier())) {
                matchedSnapshots.add(snapshot);
            }
            if (snapshot.commitIdentifier() <= minSearchedIdentifier) break;
        }
        return matchedSnapshots;
    }

    public void commitChangelog(Changelog changelog, long id) throws IOException {
        this.fileIO.writeFileUtf8(this.longLivedChangelogPath(id), changelog.toJson());
    }

    @Nullable
    public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
        Long latestId = this.latestSnapshotId();
        if (latestId == null) {
            return null;
        }
        Long earliestId = this.earliestSnapshotId();
        if (earliestId == null) {
            return null;
        }
        for (long id = latestId.longValue(); id >= earliestId; --id) {
            Snapshot snapshot;
            try {
                snapshot = this.snapshot(id);
            }
            catch (Exception e) {
                Long newEarliestId = this.earliestSnapshotId();
                if (newEarliestId == null) {
                    return null;
                }
                if (id >= newEarliestId) {
                    throw e;
                }
                return null;
            }
            if (!checker.test(snapshot)) continue;
            return snapshot;
        }
        return null;
    }

    @Nullable
    private Long findLatest(Path dir, String prefix, Function<Long, Path> file) throws IOException {
        long nextSnapshot;
        if (!this.fileIO.exists(dir)) {
            return null;
        }
        Long snapshotId = this.readHint(LATEST, dir);
        if (snapshotId != null && snapshotId > 0L && !this.fileIO.exists(file.apply(nextSnapshot = snapshotId + 1L))) {
            return snapshotId;
        }
        return this.findByListFiles(Math::max, dir, prefix);
    }

    @Nullable
    private Long findEarliest(Path dir, String prefix, Function<Long, Path> file) throws IOException {
        if (!this.fileIO.exists(dir)) {
            return null;
        }
        Long snapshotId = this.readHint(EARLIEST, dir);
        if (snapshotId != null && this.fileIO.exists(file.apply(snapshotId))) {
            return snapshotId;
        }
        return this.findByListFiles(Math::min, dir, prefix);
    }

    public Long readHint(String fileName) {
        return this.readHint(fileName, this.snapshotDirByBranch("main"));
    }

    public Long readHint(String fileName, Path dir) {
        Path path = new Path(dir, fileName);
        int retryNumber = 0;
        while (retryNumber++ < 3) {
            try {
                return this.fileIO.readOverwrittenFileUtf8(path).map(Long::parseLong).orElse(null);
            }
            catch (Exception exception) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
        }
        return null;
    }

    private Long findByListFiles(BinaryOperator<Long> reducer2, Path dir, String prefix) throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, dir, prefix).reduce(reducer2).orElse(null);
    }

    public void commitLatestHint(long snapshotId) throws IOException {
        this.commitLatestHint(snapshotId, "main");
    }

    public void commitLatestHint(long snapshotId, String branchName) throws IOException {
        this.commitHint(snapshotId, LATEST, this.snapshotDirByBranch(branchName));
    }

    public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException {
        this.commitHint(snapshotId, LATEST, this.changelogDirectory());
    }

    public void commitLongLivedChangelogEarliestHint(long snapshotId) throws IOException {
        this.commitHint(snapshotId, EARLIEST, this.changelogDirectory());
    }

    public void commitEarliestHint(long snapshotId) throws IOException {
        this.commitEarliestHint(snapshotId, "main");
    }

    public void commitEarliestHint(long snapshotId, String branchName) throws IOException {
        this.commitHint(snapshotId, EARLIEST, this.snapshotDirByBranch(branchName));
    }

    private void commitHint(long snapshotId, String fileName, Path dir) throws IOException {
        Path hintFile = new Path(dir, fileName);
        this.fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId));
    }
}

