/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MoreCollectors;
import io.airlift.units.DataSize;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.base.util.ExecutorUtil;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.transactionlog.TransactionLogParser;
import io.trino.plugin.deltalake.transactionlog.TransactionLogUtil;
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public final class TemporalTimeTravelUtil {
    private static final Pattern TRANSACTION_LOG_PATTERN = Pattern.compile("^(\\d{20})\\.json$");
    private static final int VERSION_NOT_FOUND = -1;

    private TemporalTimeTravelUtil() {
    }

    public static long findLatestVersionUsingTemporal(TrinoFileSystem fileSystem, String tableLocation, long epochMillis, Executor executor, int maxLinearSearchSize) throws IOException {
        Preconditions.checkArgument((maxLinearSearchSize > 0 ? 1 : 0) != 0, (Object)"maxLinearSearchSize must be greater than 0");
        long version = TemporalTimeTravelUtil.findLatestVersionUsingTemporalInternal(fileSystem, tableLocation, epochMillis, executor, maxLinearSearchSize);
        if (version >= 0L) {
            return version;
        }
        throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.INVALID_ARGUMENTS, String.format("No temporal version history at or before %s", Instant.ofEpochMilli(epochMillis)));
    }

    private static long findLatestVersionUsingTemporalInternal(TrinoFileSystem fileSystem, String tableLocation, long epochMillis, Executor executor, int maxLinearSearchSize) throws IOException {
        long entryNumber;
        Optional commitInfo;
        String transactionLogDir = TransactionLogUtil.getTransactionLogDir(tableLocation);
        Optional<LastCheckpoint> lastCheckpoint = TransactionLogParser.readLastCheckpoint(fileSystem, tableLocation);
        if (lastCheckpoint.isPresent() && (commitInfo = (Optional)TransactionLogTail.getEntriesFromJson(entryNumber = lastCheckpoint.map(LastCheckpoint::version).orElseThrow().longValue(), transactionLogDir, fileSystem, DataSize.ofBytes((long)0L)).orElseThrow(() -> new MissingTransactionLogException(TransactionLogUtil.getTransactionLogJsonEntryPath(transactionLogDir, entryNumber).toString())).getEntries(fileSystem).map(DeltaLakeTransactionLogEntry::getCommitInfo).filter(Objects::nonNull).collect(MoreCollectors.toOptional())).isPresent()) {
            return TemporalTimeTravelUtil.searchBasedOnLatestCheckPoint(fileSystem, transactionLogDir, epochMillis, (CommitInfoEntry)commitInfo.orElseThrow(), executor, maxLinearSearchSize);
        }
        if (fileSystem.newInputFile(TransactionLogUtil.getTransactionLogJsonEntryPath(transactionLogDir, 0L)).exists()) {
            return TemporalTimeTravelUtil.searchTowardsTailLinear(fileSystem, 0L, transactionLogDir, epochMillis);
        }
        return TemporalTimeTravelUtil.findLatestVersionFromWholeTransactions(fileSystem, transactionLogDir, epochMillis);
    }

    private static long searchBasedOnLatestCheckPoint(TrinoFileSystem fileSystem, String transactionLogDir, long epochMillis, CommitInfoEntry commitInfo, Executor executor, int maxLinearSearchSize) throws IOException {
        long commitTime = TemporalTimeTravelUtil.getCommitInfoTimestamp(commitInfo);
        if (commitTime == epochMillis) {
            return commitInfo.version();
        }
        if (commitTime < epochMillis) {
            long tail = TemporalTimeTravelUtil.searchTowardsTailLinear(fileSystem, commitInfo.version() + 1L, transactionLogDir, epochMillis);
            if (tail >= 0L) {
                return tail;
            }
            return commitInfo.version();
        }
        return TemporalTimeTravelUtil.searchTowardsHead(fileSystem, commitInfo.version() - 1L, transactionLogDir, epochMillis, executor, maxLinearSearchSize);
    }

    private static long searchTowardsHead(TrinoFileSystem fileSystem, long entryNumber, String transactionLogDir, long epochMillis, Executor executor, int maxLinearSearchSize) throws IOException {
        if (entryNumber >= (long)maxLinearSearchSize) {
            return TemporalTimeTravelUtil.searchTowardsHeadParallel(fileSystem, entryNumber, transactionLogDir, epochMillis, executor, maxLinearSearchSize);
        }
        return TemporalTimeTravelUtil.searchTowardsHeadLinear(fileSystem, 0L, entryNumber, transactionLogDir, epochMillis);
    }

    private static long searchTowardsHeadLinear(TrinoFileSystem fileSystem, long start, long end, String transactionLogDir, long epochMillis) throws IOException {
        long entryNumber = end;
        Optional<TransactionLogEntries> entries = TransactionLogTail.getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes((long)0L));
        while (start <= entryNumber && entries.isPresent()) {
            Optional<CommitInfoEntry> commitInfo = entries.get().getEntries(fileSystem).map(DeltaLakeTransactionLogEntry::getCommitInfo).filter(Objects::nonNull).findFirst();
            if (commitInfo.isPresent() && commitInfo.get().timestamp() <= epochMillis) {
                return commitInfo.get().version();
            }
            if (--entryNumber < 0L) break;
            entries = TransactionLogTail.getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes((long)0L));
        }
        return -1L;
    }

    private static long searchTowardsHeadParallel(TrinoFileSystem fileSystem, long end, String transactionLogDir, long epochMillis, Executor executor, int maxLinearSearchSize) {
        ImmutableList.Builder versionSearchTasks = ImmutableList.builder();
        for (long start = 0L; start <= end; start += (long)maxLinearSearchSize) {
            long head = start;
            long tail = start + (long)maxLinearSearchSize - 1L;
            versionSearchTasks.add(() -> TemporalTimeTravelUtil.searchTowardsHeadLinear(fileSystem, head, tail, transactionLogDir, epochMillis));
        }
        try {
            return ExecutorUtil.processWithAdditionalThreads((Collection)versionSearchTasks.build(), (Executor)executor).stream().max(Long::compare).orElse(-1L);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e.getCause());
        }
    }

    private static long searchTowardsTailLinear(TrinoFileSystem fileSystem, long start, String transactionLogDir, long epochMillis) throws IOException {
        long entryNumber = start;
        long version = -1L;
        Optional<TransactionLogEntries> entries = TransactionLogTail.getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes((long)0L));
        while (entries.isPresent()) {
            Optional<CommitInfoEntry> commitInfo = entries.get().getEntries(fileSystem).map(DeltaLakeTransactionLogEntry::getCommitInfo).filter(Objects::nonNull).findFirst();
            if (commitInfo.isEmpty()) {
                entries = TransactionLogTail.getEntriesFromJson(++entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes((long)0L));
                continue;
            }
            if (commitInfo.map(TemporalTimeTravelUtil::getCommitInfoTimestamp).orElseThrow() > epochMillis) break;
            version = entryNumber++;
            entries = TransactionLogTail.getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes((long)0L));
        }
        return version;
    }

    private static long findLatestVersionFromWholeTransactions(TrinoFileSystem fileSystem, String transactionLogDir, long epochMillis) throws IOException {
        FileIterator fileIterator = fileSystem.listFiles(Location.of((String)transactionLogDir));
        if (!fileIterator.hasNext()) {
            return -1L;
        }
        long version = -1L;
        while (fileIterator.hasNext()) {
            long entryNumber;
            Optional<CommitInfoEntry> commitInfo;
            Location location = fileIterator.next().location();
            Matcher matcher = TRANSACTION_LOG_PATTERN.matcher(location.fileName());
            if (!matcher.matches() || (commitInfo = TransactionLogTail.getEntriesFromJson(entryNumber = Long.parseLong(matcher.group(1)), fileSystem.newInputFile(location), DataSize.ofBytes((long)0L)).map(entry -> entry.getEntries(fileSystem)).orElseThrow().map(DeltaLakeTransactionLogEntry::getCommitInfo).filter(Objects::nonNull).filter(commitInfoEntry -> TemporalTimeTravelUtil.getCommitInfoTimestamp(commitInfoEntry) <= epochMillis).findFirst()).isEmpty()) continue;
            version = Math.max(version, commitInfo.get().version());
        }
        return version;
    }

    private static long getCommitInfoTimestamp(CommitInfoEntry commitInfoEntry) {
        return commitInfoEntry.inCommitTimestamp().orElse(commitInfoEntry.timestamp());
    }
}

