/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake.transactionlog.writer;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoInputStream;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer;
import io.trino.spi.connector.ConnectorSession;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class S3LockBasedTransactionLogSynchronizer
implements TransactionLogSynchronizer {
    public static final Logger LOG = Logger.get(S3LockBasedTransactionLogSynchronizer.class);
    private static final Duration EXPIRATION_DURATION = Duration.of(5L, ChronoUnit.MINUTES);
    private static final String LOCK_DIRECTORY = "_sb_lock";
    private static final String LOCK_INFIX = "sb-lock_";
    private static final Pattern LOCK_FILENAME_PATTERN = Pattern.compile("(.*)\\.sb-lock_.*");
    private final TrinoFileSystemFactory fileSystemFactory;
    private final JsonCodec<LockFileContents> lockFileContentsJsonCodec;

    @Inject
    S3LockBasedTransactionLogSynchronizer(TrinoFileSystemFactory fileSystemFactory, JsonCodec<LockFileContents> lockFileContentsJsonCodec) {
        this.fileSystemFactory = Objects.requireNonNull(fileSystemFactory, "fileSystemFactory is null");
        this.lockFileContentsJsonCodec = Objects.requireNonNull(lockFileContentsJsonCodec, "lockFileContentsJsonCodec is null");
    }

    @Override
    public boolean isUnsafe() {
        return true;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void write(ConnectorSession session, String clusterId, Location newLogEntryPath, byte[] entryContents) {
        TrinoFileSystem fileSystem = this.fileSystemFactory.create(session);
        Location locksDirectory = newLogEntryPath.sibling(LOCK_DIRECTORY);
        String newEntryFilename = newLogEntryPath.fileName();
        Optional<Object> myLockInfo = Optional.empty();
        try {
            if (fileSystem.newInputFile(newLogEntryPath).exists()) {
                throw new TransactionConflictException("Target file already exists: " + String.valueOf(newLogEntryPath));
            }
            List<LockInfo> lockInfos = this.listLockInfos(fileSystem, locksDirectory);
            Optional<Object> currentLock = Optional.empty();
            for (LockInfo lockInfo2 : lockInfos) {
                if (lockInfo2.getExpirationTime().isBefore(Instant.now())) {
                    S3LockBasedTransactionLogSynchronizer.deleteLock(fileSystem, locksDirectory, lockInfo2);
                    continue;
                }
                if (!lockInfo2.getEntryFilename().equals(newEntryFilename)) continue;
                if (currentLock.isPresent()) {
                    throw new TransactionConflictException(String.format("Multiple live locks found for: %s; lock1: %s; lock2: %s", newLogEntryPath, ((LockInfo)currentLock.get()).getLockFilename(), lockInfo2.getLockFilename()));
                }
                currentLock = Optional.of(lockInfo2);
            }
            currentLock.ifPresent(lock -> {
                throw new TransactionConflictException(String.format("Transaction log locked(1); lockingCluster=%s; lockingQuery=%s; expires=%s", lock.getClusterId(), lock.getOwningQuery(), lock.getExpirationTime()));
            });
            myLockInfo = Optional.of(this.writeNewLockInfo(fileSystem, locksDirectory, newEntryFilename, clusterId, session.getQueryId()));
            lockInfos = this.listLockInfos(fileSystem, locksDirectory);
            String myLockFilename = ((LockInfo)myLockInfo.get()).getLockFilename();
            currentLock = lockInfos.stream().filter(lockInfo -> lockInfo.getEntryFilename().equals(newEntryFilename)).filter(lockInfo -> !lockInfo.getLockFilename().equals(myLockFilename)).findFirst();
            if (currentLock.isPresent()) {
                throw new TransactionConflictException(String.format("Transaction log locked(2); lockingCluster=%s; lockingQuery=%s; expires=%s", ((LockInfo)currentLock.get()).getClusterId(), ((LockInfo)currentLock.get()).getOwningQuery(), ((LockInfo)currentLock.get()).getExpirationTime()));
            }
            if (fileSystem.newInputFile(newLogEntryPath).exists()) {
                throw new TransactionConflictException("Target file was created during locking: " + String.valueOf(newLogEntryPath));
            }
            fileSystem.newOutputFile(newLogEntryPath).createOrOverwrite(entryContents);
            if (!myLockInfo.isPresent()) return;
        }
        catch (IOException e) {
            try {
                throw new UncheckedIOException("Internal error while writing " + String.valueOf(newLogEntryPath), e);
            }
            catch (Throwable throwable) {
                if (!myLockInfo.isPresent()) throw throwable;
                try {
                    S3LockBasedTransactionLogSynchronizer.deleteLock(fileSystem, locksDirectory, (LockInfo)myLockInfo.get());
                    throw throwable;
                }
                catch (IOException e2) {
                    LOG.warn((Throwable)e2, "Could not delete lockfile %s", new Object[]{((LockInfo)myLockInfo.get()).lockFilename});
                }
                throw throwable;
            }
        }
        try {
            S3LockBasedTransactionLogSynchronizer.deleteLock(fileSystem, locksDirectory, (LockInfo)myLockInfo.get());
            return;
        }
        catch (IOException e) {
            LOG.warn((Throwable)e, "Could not delete lockfile %s", new Object[]{((LockInfo)myLockInfo.get()).lockFilename});
            return;
        }
    }

    private LockInfo writeNewLockInfo(TrinoFileSystem fileSystem, Location lockDirectory, String logEntryFilename, String clusterId, String queryId) throws IOException {
        String lockFilename = logEntryFilename + ".sb-lock_" + queryId;
        Instant expiration = Instant.now().plus(EXPIRATION_DURATION);
        LockFileContents contents = new LockFileContents(clusterId, queryId, expiration.toEpochMilli());
        Location lockPath = lockDirectory.appendPath(lockFilename);
        TrinoOutputFile lockFile = fileSystem.newOutputFile(lockPath);
        byte[] contentsBytes = this.lockFileContentsJsonCodec.toJsonBytes((Object)contents);
        try (OutputStream outputStream = lockFile.create();){
            outputStream.write(contentsBytes);
        }
        return new LockInfo(lockFilename, contents);
    }

    private static void deleteLock(TrinoFileSystem fileSystem, Location lockDirectoryPath, LockInfo lockInfo) throws IOException {
        fileSystem.deleteFile(lockDirectoryPath.appendPath(lockInfo.getLockFilename()));
    }

    private List<LockInfo> listLockInfos(TrinoFileSystem fileSystem, Location lockDirectoryPath) throws IOException {
        FileIterator files = fileSystem.listFiles(lockDirectoryPath);
        ImmutableList.Builder lockInfos = ImmutableList.builder();
        while (files.hasNext()) {
            FileEntry entry = files.next();
            String name = entry.location().fileName();
            if (!LOCK_FILENAME_PATTERN.matcher(name).matches()) continue;
            TrinoInputFile file = fileSystem.newInputFile(entry.location());
            this.parseLockFile(file, name).ifPresent(arg_0 -> ((ImmutableList.Builder)lockInfos).add(arg_0));
        }
        return lockInfos.build();
    }

    private Optional<LockInfo> parseLockFile(TrinoInputFile file, String name) throws IOException {
        Optional<LockInfo> optional;
        block10: {
            byte[] bytes = null;
            TrinoInputStream inputStream = file.newStream();
            try {
                bytes = inputStream.readAllBytes();
                LockFileContents lockFileContents = (LockFileContents)this.lockFileContentsJsonCodec.fromJson(bytes);
                optional = Optional.of(new LockInfo(name, lockFileContents));
                if (inputStream == null) break block10;
            }
            catch (Throwable lockFileContents) {
                try {
                    if (inputStream != null) {
                        try {
                            inputStream.close();
                        }
                        catch (Throwable throwable) {
                            lockFileContents.addSuppressed(throwable);
                        }
                    }
                    throw lockFileContents;
                }
                catch (IllegalArgumentException e) {
                    String content = null;
                    if (bytes != null) {
                        content = Base64.getEncoder().encodeToString(bytes);
                    }
                    LOG.warn((Throwable)e, "Could not parse lock file: %s; contents=%s", new Object[]{file.location(), content});
                    return Optional.empty();
                }
                catch (FileNotFoundException e) {
                    return Optional.empty();
                }
            }
            inputStream.close();
        }
        return optional;
    }

    public static String parseEntryFilename(String lockFilename) {
        Matcher matcher = LOCK_FILENAME_PATTERN.matcher(lockFilename);
        if (!matcher.matches()) {
            throw new IllegalArgumentException("Lock filename " + lockFilename + " does not match expected pattern");
        }
        return matcher.group(1);
    }

    private static class LockInfo {
        private final String lockFilename;
        private final String entryFilename;
        private final LockFileContents contents;

        public LockInfo(String lockFilename, LockFileContents contents) {
            this.lockFilename = Objects.requireNonNull(lockFilename, "lockFilename is null");
            this.entryFilename = S3LockBasedTransactionLogSynchronizer.parseEntryFilename(lockFilename);
            this.contents = Objects.requireNonNull(contents, "contents is null");
        }

        public String getLockFilename() {
            return this.lockFilename;
        }

        public String getEntryFilename() {
            return this.entryFilename;
        }

        public String getClusterId() {
            return this.contents.clusterId();
        }

        public String getOwningQuery() {
            return this.contents.owningQuery();
        }

        public Instant getExpirationTime() {
            return Instant.ofEpochMilli(this.contents.expirationEpochMillis());
        }
    }

    public record LockFileContents(String clusterId, String owningQuery, long expirationEpochMillis) {
        public LockFileContents {
            Objects.requireNonNull(clusterId, "clusterId is null");
            Objects.requireNonNull(owningQuery, "owningQuery is null");
        }
    }
}

