/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.utils;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.table.Instant;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.HintFileUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotLoader;
import org.apache.paimon.utils.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotManager
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
    public static final String SNAPSHOT_PREFIX = "snapshot-";
    public static final int EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM = 3;
    private final FileIO fileIO;
    private final Path tablePath;
    private final String branch;
    @Nullable
    private final SnapshotLoader snapshotLoader;
    @Nullable
    private final Cache<Path, Snapshot> cache;

    public SnapshotManager(FileIO fileIO, Path tablePath, @Nullable String branchName, @Nullable SnapshotLoader snapshotLoader, @Nullable Cache<Path, Snapshot> cache) {
        this.fileIO = fileIO;
        this.tablePath = tablePath;
        this.branch = BranchManager.normalizeBranch(branchName);
        this.snapshotLoader = snapshotLoader;
        this.cache = cache;
    }

    public SnapshotManager copyWithBranch(String branchName) {
        SnapshotLoader newSnapshotLoader = null;
        if (this.snapshotLoader != null) {
            newSnapshotLoader = this.snapshotLoader.copyWithBranch(branchName);
        }
        return new SnapshotManager(this.fileIO, this.tablePath, branchName, newSnapshotLoader, this.cache);
    }

    public FileIO fileIO() {
        return this.fileIO;
    }

    public Path tablePath() {
        return this.tablePath;
    }

    public String branch() {
        return this.branch;
    }

    public Path snapshotPath(long snapshotId) {
        return new Path(BranchManager.branchPath(this.tablePath, this.branch) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
    }

    public Path snapshotDirectory() {
        return new Path(BranchManager.branchPath(this.tablePath, this.branch) + "/snapshot");
    }

    public void invalidateCache() {
        if (this.cache != null) {
            this.cache.invalidateAll();
        }
    }

    public Snapshot snapshot(long snapshotId) {
        Snapshot snapshot;
        Path path = this.snapshotPath(snapshotId);
        Snapshot snapshot2 = snapshot = this.cache == null ? null : this.cache.getIfPresent(path);
        if (snapshot == null) {
            snapshot = SnapshotManager.fromPath(this.fileIO, path);
            if (this.cache != null) {
                this.cache.put(path, snapshot);
            }
        }
        return snapshot;
    }

    public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException {
        Snapshot snapshot;
        Path path = this.snapshotPath(snapshotId);
        Snapshot snapshot2 = snapshot = this.cache == null ? null : this.cache.getIfPresent(path);
        if (snapshot == null) {
            snapshot = SnapshotManager.tryFromPath(this.fileIO, path);
            if (this.cache != null) {
                this.cache.put(path, snapshot);
            }
        }
        return snapshot;
    }

    public boolean snapshotExists(long snapshotId) {
        Path path = this.snapshotPath(snapshotId);
        try {
            return this.fileIO.exists(path);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to determine if snapshot #" + snapshotId + " exists in path " + path, e);
        }
    }

    public void deleteSnapshot(long snapshotId) {
        Path path = this.snapshotPath(snapshotId);
        if (this.cache != null) {
            this.cache.invalidate(path);
        }
        this.fileIO().deleteQuietly(path);
    }

    @Nullable
    public Snapshot latestSnapshot() {
        if (this.snapshotLoader != null) {
            try {
                Snapshot snapshot = this.snapshotLoader.load().orElse(null);
                if (snapshot != null && this.cache != null) {
                    this.cache.put(this.snapshotPath(snapshot.id()), snapshot);
                }
                return snapshot;
            }
            catch (UnsupportedOperationException snapshot) {
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        return this.latestSnapshotFromFileSystem();
    }

    @Nullable
    public Snapshot latestSnapshotFromFileSystem() {
        Long snapshotId = this.latestSnapshotIdFromFileSystem();
        return snapshotId == null ? null : this.snapshot(snapshotId);
    }

    @Nullable
    public Long latestSnapshotId() {
        try {
            if (this.snapshotLoader != null) {
                try {
                    return this.snapshotLoader.load().map(Snapshot::id).orElse(null);
                }
                catch (UnsupportedOperationException unsupportedOperationException) {}
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find latest snapshot id", e);
        }
        return this.latestSnapshotIdFromFileSystem();
    }

    @Nullable
    public Long latestSnapshotIdFromFileSystem() {
        try {
            return this.findLatest(this.snapshotDirectory(), SNAPSHOT_PREFIX, this::snapshotPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find latest snapshot id", e);
        }
    }

    @Nullable
    public Snapshot earliestSnapshot() {
        return this.earliestSnapshot(null);
    }

    public void rollback(Instant instant) {
        if (this.snapshotLoader != null) {
            try {
                this.snapshotLoader.rollback(instant);
                return;
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        throw new UnsupportedOperationException("rollback is not supported");
    }

    @Nullable
    private Snapshot earliestSnapshot(@Nullable Long stopSnapshotId) {
        Long snapshotId = this.earliestSnapshotId();
        if (snapshotId == null) {
            return null;
        }
        if (stopSnapshotId == null) {
            stopSnapshotId = snapshotId + 3L;
        }
        while (true) {
            try {
                return this.tryGetSnapshot(snapshotId);
            }
            catch (FileNotFoundException e) {
                Long l = snapshotId;
                Long l2 = snapshotId = Long.valueOf(snapshotId + 1L);
                if (snapshotId > stopSnapshotId) {
                    return null;
                }
                LOG.warn("The earliest snapshot or changelog was once identified but disappeared. It might have been expired by other jobs operating on this table. Searching for the second earliest snapshot or changelog instead. ");
                continue;
            }
            break;
        }
    }

    public boolean earliestFileNotExists() {
        return HintFileUtils.readHint(this.fileIO, "EARLIEST", this.snapshotDirectory()) == null;
    }

    @Nullable
    public Long earliestSnapshotId() {
        try {
            return this.findEarliest(this.snapshotDirectory(), SNAPSHOT_PREFIX, this::snapshotPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find earliest snapshot id", e);
        }
    }

    @Nullable
    public Long pickOrLatest(Predicate<Snapshot> predicate) {
        Long latestId = this.latestSnapshotId();
        Long earliestId = this.earliestSnapshotId();
        if (latestId == null || earliestId == null) {
            return null;
        }
        for (long snapshotId = latestId.longValue(); snapshotId >= earliestId; --snapshotId) {
            Snapshot snapshot;
            if (!this.snapshotExists(snapshotId) || !predicate.test(snapshot = this.snapshot(snapshotId))) continue;
            return snapshot.id();
        }
        return latestId;
    }

    @Nullable
    public Snapshot earlierOrEqualTimeMills(long timestampMills) {
        Long latest = this.latestSnapshotId();
        if (latest == null) {
            return null;
        }
        Snapshot earliestSnapShot = this.earliestSnapshot(latest);
        if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) {
            return null;
        }
        long earliest = earliestSnapShot.id();
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            long commitTime = snapshot.timeMillis();
            if (commitTime > timestampMills) {
                latest = mid - 1L;
                continue;
            }
            if (commitTime < timestampMills) {
                earliest = mid + 1L;
                finalSnapshot = snapshot;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    @Nullable
    public Snapshot laterOrEqualTimeMills(long timestampMills) {
        Long earliest = this.earliestSnapshotId();
        Long latest = this.latestSnapshotId();
        if (earliest == null || latest == null) {
            return null;
        }
        Snapshot latestSnapShot = this.snapshot(latest);
        if (latestSnapShot.timeMillis() < timestampMills) {
            return null;
        }
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            long commitTime = snapshot.timeMillis();
            if (commitTime > timestampMills) {
                latest = mid - 1L;
                finalSnapshot = snapshot;
                continue;
            }
            if (commitTime < timestampMills) {
                earliest = mid + 1L;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    @Nullable
    public Snapshot earlierOrEqualWatermark(long watermark) {
        Long latest = this.latestSnapshotId();
        if (latest == null || this.snapshot(latest).watermark() == Long.MIN_VALUE) {
            return null;
        }
        Snapshot earliestSnapShot = this.earliestSnapshot(latest);
        if (earliestSnapShot == null) {
            return null;
        }
        long earliest = earliestSnapShot.id();
        Long earliestWatermark = null;
        earliestWatermark = earliestSnapShot.watermark();
        if (earliestWatermark == null) {
            while (earliest < latest && (earliestWatermark = this.snapshot(++earliest).watermark()) == null) {
            }
        }
        if (earliestWatermark == null) {
            return null;
        }
        if (earliestWatermark >= watermark) {
            return this.snapshot(earliest);
        }
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            Long commitWatermark = snapshot.watermark();
            if (commitWatermark == null) {
                while (mid >= earliest && (commitWatermark = this.snapshot(--mid).watermark()) == null) {
                }
            }
            if (commitWatermark == null) {
                earliest = mid + 1L;
                continue;
            }
            if (commitWatermark > watermark) {
                latest = mid - 1L;
                continue;
            }
            if (commitWatermark < watermark) {
                earliest = mid + 1L;
                finalSnapshot = snapshot;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    @Nullable
    public Snapshot laterOrEqualWatermark(long watermark) {
        Long latest = this.latestSnapshotId();
        if (latest == null || this.snapshot(latest).watermark() == Long.MIN_VALUE) {
            return null;
        }
        Snapshot earliestSnapShot = this.earliestSnapshot(latest);
        if (earliestSnapShot == null) {
            return null;
        }
        long earliest = earliestSnapShot.id();
        Long earliestWatermark = null;
        earliestWatermark = earliestSnapShot.watermark();
        if (earliestWatermark == null) {
            while (earliest < latest && (earliestWatermark = this.snapshot(++earliest).watermark()) == null) {
            }
        }
        if (earliestWatermark == null) {
            return null;
        }
        if (earliestWatermark >= watermark) {
            return this.snapshot(earliest);
        }
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            Long commitWatermark = snapshot.watermark();
            if (commitWatermark == null) {
                while (mid >= earliest && (commitWatermark = this.snapshot(--mid).watermark()) == null) {
                }
            }
            if (commitWatermark == null) {
                earliest = mid + 1L;
                continue;
            }
            if (commitWatermark > watermark) {
                latest = mid - 1L;
                finalSnapshot = snapshot;
                continue;
            }
            if (commitWatermark < watermark) {
                earliest = mid + 1L;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    public long snapshotCount() throws IOException {
        return this.snapshotIdStream().count();
    }

    public Iterator<Snapshot> snapshots() throws IOException {
        return this.snapshotIdStream().map(this::snapshot).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException {
        return this.snapshotIdStream().filter(predicate).map(this::snapshotPath).collect(Collectors.toList());
    }

    public Stream<Long> snapshotIdStream() throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, this.snapshotDirectory(), SNAPSHOT_PREFIX);
    }

    public Iterator<Snapshot> snapshotsWithId(List<Long> snapshotIds) {
        return snapshotIds.stream().map(this::snapshot).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public Iterator<Snapshot> snapshotsWithinRange(Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) {
        Long lowerBoundSnapshotId = this.earliestSnapshotId();
        Long upperBoundSnapshotId = this.latestSnapshotId();
        if (lowerBoundSnapshotId == null || upperBoundSnapshotId == null) {
            return Collections.emptyIterator();
        }
        if (optionalMaxSnapshotId.isPresent()) {
            Long upperId = optionalMaxSnapshotId.get();
            if (upperId < lowerBoundSnapshotId) {
                throw new RuntimeException(String.format("snapshot upper id:%s should not greater than earliestSnapshotId:%s", upperId, lowerBoundSnapshotId));
            }
            Long l = upperBoundSnapshotId = upperId < upperBoundSnapshotId ? upperId : upperBoundSnapshotId;
        }
        if (optionalMinSnapshotId.isPresent()) {
            Long lowerId = optionalMinSnapshotId.get();
            if (lowerId > upperBoundSnapshotId) {
                throw new RuntimeException(String.format("snapshot upper id:%s should not greater than latestSnapshotId:%s", lowerId, upperBoundSnapshotId));
            }
            lowerBoundSnapshotId = lowerId > lowerBoundSnapshotId ? lowerId : lowerBoundSnapshotId;
        }
        return LongStream.range(lowerBoundSnapshotId, upperBoundSnapshotId + 1L).mapToObj(this::snapshot).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public List<Snapshot> safelyGetAllSnapshots() throws IOException {
        List<Path> paths = this.snapshotIdStream().map(this::snapshotPath).collect(Collectors.toList());
        List<Snapshot> snapshots = Collections.synchronizedList(new ArrayList(paths.size()));
        SnapshotManager.collectSnapshots(path -> {
            try {
                snapshots.add(SnapshotManager.tryFromPath(this.fileIO, path));
            }
            catch (FileNotFoundException fileNotFoundException) {
                // empty catch block
            }
        }, paths);
        return snapshots;
    }

    private static void collectSnapshots(Consumer<Path> pathConsumer, List<Path> paths) throws IOException {
        ThreadPoolExecutor executor = ThreadPoolUtils.createCachedThreadPool(Runtime.getRuntime().availableProcessors(), "SNAPSHOT_COLLECTOR");
        try {
            ThreadPoolUtils.randomlyOnlyExecute(executor, pathConsumer, paths);
        }
        catch (RuntimeException e) {
            throw new IOException(e);
        }
        finally {
            executor.shutdown();
        }
    }

    public Optional<Snapshot> latestSnapshotOfUser(String user) {
        return this.latestSnapshotOfUser(user, this.latestSnapshotId());
    }

    public Optional<Snapshot> latestSnapshotOfUserFromFilesystem(String user) {
        return this.latestSnapshotOfUser(user, this.latestSnapshotIdFromFileSystem());
    }

    private Optional<Snapshot> latestSnapshotOfUser(String user, Long latestId) {
        if (latestId == null) {
            return Optional.empty();
        }
        long earliestId = Preconditions.checkNotNull(this.earliestSnapshotId(), "Latest snapshot id is not null, but earliest snapshot id is null. This is unexpected.");
        for (long id = latestId.longValue(); id >= earliestId; --id) {
            Snapshot snapshot;
            try {
                snapshot = this.snapshot(id);
            }
            catch (Exception e) {
                long newEarliestId = Preconditions.checkNotNull(this.earliestSnapshotId(), "Latest snapshot id is not null, but earliest snapshot id is null. This is unexpected.");
                if (id >= newEarliestId) {
                    throw e;
                }
                LOG.warn("Snapshot #" + id + " is expired. The latest snapshot of current user(" + user + ") is not found.");
                break;
            }
            if (!user.equals(snapshot.commitUser())) continue;
            return Optional.of(snapshot);
        }
        return Optional.empty();
    }

    public List<Snapshot> findSnapshotsForIdentifiers(@Nonnull String user, List<Long> identifiers) {
        if (identifiers.isEmpty()) {
            return Collections.emptyList();
        }
        Long latestId = this.latestSnapshotId();
        if (latestId == null) {
            return Collections.emptyList();
        }
        long earliestId = Preconditions.checkNotNull(this.earliestSnapshotId(), "Latest snapshot id is not null, but earliest snapshot id is null. This is unexpected.");
        long minSearchedIdentifier = (Long)identifiers.stream().min(Long::compareTo).get();
        ArrayList<Snapshot> matchedSnapshots = new ArrayList<Snapshot>();
        HashSet<Long> remainingIdentifiers = new HashSet<Long>(identifiers);
        for (long id = latestId.longValue(); id >= earliestId && !remainingIdentifiers.isEmpty(); --id) {
            Snapshot snapshot = this.snapshot(id);
            if (!user.equals(snapshot.commitUser())) continue;
            if (remainingIdentifiers.remove(snapshot.commitIdentifier())) {
                matchedSnapshots.add(snapshot);
            }
            if (snapshot.commitIdentifier() <= minSearchedIdentifier) break;
        }
        return matchedSnapshots;
    }

    @Nullable
    public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
        Long latestId = this.latestSnapshotId();
        if (latestId == null) {
            return null;
        }
        Long earliestId = this.earliestSnapshotId();
        if (earliestId == null) {
            return null;
        }
        for (long id = latestId.longValue(); id >= earliestId; --id) {
            Snapshot snapshot;
            try {
                snapshot = this.snapshot(id);
            }
            catch (Exception e) {
                Long newEarliestId = this.earliestSnapshotId();
                if (newEarliestId == null) {
                    return null;
                }
                if (id >= newEarliestId) {
                    throw e;
                }
                return null;
            }
            if (!checker.test(snapshot)) continue;
            return snapshot;
        }
        return null;
    }

    @Nullable
    private Long findLatest(Path dir, String prefix, Function<Long, Path> file) throws IOException {
        return HintFileUtils.findLatest(this.fileIO, dir, prefix, file);
    }

    @Nullable
    private Long findEarliest(Path dir, String prefix, Function<Long, Path> file) throws IOException {
        return HintFileUtils.findEarliest(this.fileIO, dir, prefix, file);
    }

    public static int findPreviousSnapshot(List<Snapshot> sortedSnapshots, long targetSnapshotId) {
        for (int i = sortedSnapshots.size() - 1; i >= 0; --i) {
            if (sortedSnapshots.get(i).id() >= targetSnapshotId) continue;
            return i;
        }
        return -1;
    }

    public static int findPreviousOrEqualSnapshot(List<Snapshot> sortedSnapshots, long targetSnapshotId) {
        for (int i = sortedSnapshots.size() - 1; i >= 0; --i) {
            if (sortedSnapshots.get(i).id() > targetSnapshotId) continue;
            return i;
        }
        return -1;
    }

    public void deleteLatestHint() throws IOException {
        HintFileUtils.deleteLatestHint(this.fileIO, this.snapshotDirectory());
    }

    public void commitLatestHint(long snapshotId) throws IOException {
        HintFileUtils.commitLatestHint(this.fileIO, snapshotId, this.snapshotDirectory());
    }

    public void commitEarliestHint(long snapshotId) throws IOException {
        HintFileUtils.commitEarliestHint(this.fileIO, snapshotId, this.snapshotDirectory());
    }

    public static Snapshot fromPath(FileIO fileIO, Path path) {
        try {
            return SnapshotManager.tryFromPath(fileIO, path);
        }
        catch (FileNotFoundException e) {
            String errorMessage = String.format("Snapshot file %s does not exist. It might have been expired by other jobs operating on this table. In this case, you can avoid concurrent modification issues by configuring write-only = true and use a dedicated compaction job, or configuring different expiration thresholds for different jobs.", path);
            throw new RuntimeException(errorMessage, e);
        }
    }

    public static Snapshot tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
        try {
            return Snapshot.fromJson(fileIO.readFileUtf8(path));
        }
        catch (FileNotFoundException e) {
            throw e;
        }
        catch (IOException e) {
            throw new RuntimeException("Fails to read snapshot from path " + path, e);
        }
    }
}

