/*
 * Decompiled with CFR 0.152.
 */
package io.trino.transaction;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.NotInTransactionException;
import io.trino.metadata.Catalog;
import io.trino.metadata.CatalogInfo;
import io.trino.metadata.CatalogManager;
import io.trino.metadata.CatalogMetadata;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.transaction.IsolationLevel;
import io.trino.transaction.TransactionId;
import io.trino.transaction.TransactionInfo;
import io.trino.transaction.TransactionManager;
import io.trino.transaction.TransactionManagerConfig;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.joda.time.DateTime;

@ThreadSafe
public class InMemoryTransactionManager
implements TransactionManager {
    private static final Logger log = Logger.get(InMemoryTransactionManager.class);
    private final Duration idleTimeout;
    private final int maxFinishingConcurrency;
    private final ConcurrentMap<TransactionId, TransactionMetadata> transactions = new ConcurrentHashMap<TransactionId, TransactionMetadata>();
    private final CatalogManager catalogManager;
    private final Executor finishingExecutor;

    private InMemoryTransactionManager(Duration idleTimeout, int maxFinishingConcurrency, CatalogManager catalogManager, Executor finishingExecutor) {
        this.catalogManager = catalogManager;
        Objects.requireNonNull(idleTimeout, "idleTimeout is null");
        Preconditions.checkArgument((maxFinishingConcurrency > 0 ? 1 : 0) != 0, (Object)"maxFinishingConcurrency must be at least 1");
        Objects.requireNonNull(finishingExecutor, "finishingExecutor is null");
        this.idleTimeout = idleTimeout;
        this.maxFinishingConcurrency = maxFinishingConcurrency;
        this.finishingExecutor = finishingExecutor;
    }

    public static TransactionManager create(TransactionManagerConfig config, ScheduledExecutorService idleCheckExecutor, CatalogManager catalogManager, Executor finishingExecutor) {
        InMemoryTransactionManager transactionManager = new InMemoryTransactionManager(config.getIdleTimeout(), config.getMaxFinishingConcurrency(), catalogManager, finishingExecutor);
        transactionManager.scheduleIdleChecks(config.getIdleCheckInterval(), idleCheckExecutor);
        return transactionManager;
    }

    public static TransactionManager createTestTransactionManager() {
        return new InMemoryTransactionManager(new Duration(1.0, TimeUnit.DAYS), 1, CatalogManager.NO_CATALOGS, MoreExecutors.directExecutor());
    }

    private void scheduleIdleChecks(Duration idleCheckInterval, ScheduledExecutorService idleCheckExecutor) {
        idleCheckExecutor.scheduleWithFixedDelay(() -> {
            try {
                this.cleanUpExpiredTransactions();
            }
            catch (Throwable t) {
                log.error(t, "Unexpected exception while cleaning up expired transactions");
            }
        }, idleCheckInterval.toMillis(), idleCheckInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private synchronized void cleanUpExpiredTransactions() {
        Iterator iterator = this.transactions.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry entry = iterator.next();
            if (!((TransactionMetadata)entry.getValue()).isExpired(this.idleTimeout)) continue;
            iterator.remove();
            log.info("Removing expired transaction: %s", new Object[]{entry.getKey()});
            ((TransactionMetadata)entry.getValue()).asyncAbort();
        }
    }

    @Override
    public boolean transactionExists(TransactionId transactionId) {
        return this.tryGetTransactionMetadata(transactionId).isPresent();
    }

    @Override
    public TransactionInfo getTransactionInfo(TransactionId transactionId) {
        return this.getTransactionMetadata(transactionId).getTransactionInfo();
    }

    @Override
    public Optional<TransactionInfo> getTransactionInfoIfExist(TransactionId transactionId) {
        return this.tryGetTransactionMetadata(transactionId).map(TransactionMetadata::getTransactionInfo);
    }

    @Override
    public List<TransactionInfo> getAllTransactionInfos() {
        return (List)this.transactions.values().stream().map(TransactionMetadata::getTransactionInfo).collect(ImmutableList.toImmutableList());
    }

    @Override
    public Set<TransactionId> getTransactionsUsingCatalog(CatalogHandle catalogHandle) {
        return (Set)this.transactions.values().stream().filter(transactionMetadata -> transactionMetadata.isUsingCatalog(catalogHandle)).map(TransactionMetadata::getTransactionId).collect(ImmutableSet.toImmutableSet());
    }

    @Override
    public TransactionId beginTransaction(boolean autoCommitContext) {
        return this.beginTransaction(DEFAULT_ISOLATION, false, autoCommitContext);
    }

    @Override
    public TransactionId beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommitContext) {
        BoundedExecutor executor;
        TransactionMetadata transactionMetadata;
        TransactionId transactionId = TransactionId.create();
        Preconditions.checkState((this.transactions.put(transactionId, transactionMetadata = new TransactionMetadata(transactionId, isolationLevel, readOnly, autoCommitContext, this.catalogManager, (Executor)(executor = new BoundedExecutor(this.finishingExecutor, this.maxFinishingConcurrency)))) == null ? 1 : 0) != 0, (String)"Duplicate transaction ID: %s", (Object)transactionId);
        return transactionId;
    }

    @Override
    public List<CatalogInfo> getCatalogs(TransactionId transactionId) {
        return this.getTransactionMetadata(transactionId).listCatalogs();
    }

    @Override
    public List<CatalogInfo> getActiveCatalogs(TransactionId transactionId) {
        return this.getTransactionMetadata(transactionId).getActiveCatalogs();
    }

    @Override
    public Optional<CatalogHandle> getCatalogHandle(TransactionId transactionId, String catalogName) {
        return this.getTransactionMetadata(transactionId).tryRegisterCatalog(catalogName);
    }

    @Override
    public Optional<CatalogMetadata> getOptionalCatalogMetadata(TransactionId transactionId, String catalogName) {
        TransactionMetadata transactionMetadata = this.getTransactionMetadata(transactionId);
        return transactionMetadata.tryRegisterCatalog(catalogName).map(transactionMetadata::getTransactionCatalogMetadata);
    }

    @Override
    public CatalogMetadata getCatalogMetadata(TransactionId transactionId, CatalogHandle catalogHandle) {
        return this.getTransactionMetadata(transactionId).getTransactionCatalogMetadata(catalogHandle);
    }

    @Override
    public CatalogMetadata getCatalogMetadataForWrite(TransactionId transactionId, CatalogHandle catalogHandle) {
        CatalogMetadata catalogMetadata = this.getCatalogMetadata(transactionId, catalogHandle);
        this.checkConnectorWrite(transactionId, catalogHandle);
        return catalogMetadata;
    }

    @Override
    public CatalogMetadata getCatalogMetadataForWrite(TransactionId transactionId, String catalogName) {
        TransactionMetadata transactionMetadata = this.getTransactionMetadata(transactionId);
        CatalogHandle catalogHandle = transactionMetadata.tryRegisterCatalog(catalogName).orElseThrow(() -> new TrinoException((ErrorCodeSupplier)StandardErrorCode.CATALOG_NOT_FOUND, "Catalog '%s' not found".formatted(catalogName)));
        return this.getCatalogMetadataForWrite(transactionId, catalogHandle);
    }

    @Override
    public ConnectorTransactionHandle getConnectorTransaction(TransactionId transactionId, String catalogName) {
        TransactionMetadata transactionMetadata = this.getTransactionMetadata(transactionId);
        CatalogHandle catalogHandle = transactionMetadata.tryRegisterCatalog(catalogName).orElseThrow(() -> new TrinoException((ErrorCodeSupplier)StandardErrorCode.CATALOG_NOT_FOUND, "Catalog '%s' not found".formatted(catalogName)));
        return transactionMetadata.getTransactionCatalogMetadata(catalogHandle).getTransactionHandleFor(catalogHandle);
    }

    @Override
    public ConnectorTransactionHandle getConnectorTransaction(TransactionId transactionId, CatalogHandle catalogHandle) {
        return this.getCatalogMetadata(transactionId, catalogHandle).getTransactionHandleFor(catalogHandle);
    }

    private void checkConnectorWrite(TransactionId transactionId, CatalogHandle catalogHandle) {
        this.getTransactionMetadata(transactionId).checkConnectorWrite(catalogHandle);
    }

    @Override
    public void checkAndSetActive(TransactionId transactionId) {
        TransactionMetadata metadata = this.getTransactionMetadata(transactionId);
        metadata.checkOpenTransaction();
        metadata.setActive();
    }

    @Override
    public void trySetActive(TransactionId transactionId) {
        this.tryGetTransactionMetadata(transactionId).ifPresent(TransactionMetadata::setActive);
    }

    @Override
    public void trySetInactive(TransactionId transactionId) {
        this.tryGetTransactionMetadata(transactionId).ifPresent(TransactionMetadata::setInactive);
    }

    private TransactionMetadata getTransactionMetadata(TransactionId transactionId) {
        TransactionMetadata transactionMetadata = (TransactionMetadata)this.transactions.get(transactionId);
        if (transactionMetadata == null) {
            throw new NotInTransactionException(transactionId);
        }
        return transactionMetadata;
    }

    private Optional<TransactionMetadata> tryGetTransactionMetadata(TransactionId transactionId) {
        return Optional.ofNullable((TransactionMetadata)this.transactions.get(transactionId));
    }

    private ListenableFuture<TransactionMetadata> removeTransactionMetadataAsFuture(TransactionId transactionId) {
        TransactionMetadata transactionMetadata = (TransactionMetadata)this.transactions.remove(transactionId);
        if (transactionMetadata == null) {
            return Futures.immediateFailedFuture((Throwable)((Object)new NotInTransactionException(transactionId)));
        }
        return Futures.immediateFuture((Object)transactionMetadata);
    }

    @Override
    public ListenableFuture<Void> asyncCommit(TransactionId transactionId) {
        return Futures.nonCancellationPropagating((ListenableFuture)Futures.transformAsync(this.removeTransactionMetadataAsFuture(transactionId), TransactionMetadata::asyncCommit, (Executor)MoreExecutors.directExecutor()));
    }

    @Override
    public ListenableFuture<Void> asyncAbort(TransactionId transactionId) {
        return Futures.nonCancellationPropagating((ListenableFuture)Futures.transformAsync(this.removeTransactionMetadataAsFuture(transactionId), TransactionMetadata::asyncAbort, (Executor)MoreExecutors.directExecutor()));
    }

    @Override
    public void blockCommit(TransactionId transactionId, String reason) {
        this.getTransactionMetadata(transactionId).blockCommit(reason);
    }

    @Override
    public void fail(TransactionId transactionId) {
        this.tryGetTransactionMetadata(transactionId).ifPresent(TransactionMetadata::asyncAbort);
    }

    private static <T> ListenableFuture<Void> asVoid(ListenableFuture<T> future) {
        return Futures.transform(future, v -> null, (Executor)MoreExecutors.directExecutor());
    }

    @ThreadSafe
    private static class TransactionMetadata {
        private final DateTime createTime = DateTime.now();
        private final CatalogManager catalogManager;
        private final TransactionId transactionId;
        private final IsolationLevel isolationLevel;
        private final boolean readOnly;
        private final boolean autoCommitContext;
        private final Executor finishingExecutor;
        private final AtomicReference<String> commitBlocked = new AtomicReference();
        private final AtomicReference<Boolean> completedSuccessfully = new AtomicReference();
        private final AtomicReference<Long> idleStartTime = new AtomicReference();
        @GuardedBy(value="this")
        private final Map<String, Optional<Catalog>> registeredCatalogs = new ConcurrentHashMap<String, Optional<Catalog>>();
        @GuardedBy(value="this")
        private final Map<CatalogHandle, CatalogMetadata> activeCatalogs = new ConcurrentHashMap<CatalogHandle, CatalogMetadata>();
        @GuardedBy(value="this")
        private final AtomicReference<CatalogHandle> writtenCatalog = new AtomicReference();

        public TransactionMetadata(TransactionId transactionId, IsolationLevel isolationLevel, boolean readOnly, boolean autoCommitContext, CatalogManager catalogManager, Executor finishingExecutor) {
            this.transactionId = Objects.requireNonNull(transactionId, "transactionId is null");
            this.isolationLevel = Objects.requireNonNull(isolationLevel, "isolationLevel is null");
            this.readOnly = readOnly;
            this.autoCommitContext = autoCommitContext;
            this.catalogManager = Objects.requireNonNull(catalogManager, "catalogManager is null");
            this.finishingExecutor = Objects.requireNonNull(finishingExecutor, "finishingExecutor is null");
        }

        public TransactionId getTransactionId() {
            return this.transactionId;
        }

        public void setActive() {
            this.idleStartTime.set(null);
        }

        public void setInactive() {
            this.idleStartTime.set(System.nanoTime());
        }

        public boolean isExpired(Duration idleTimeout) {
            Long idleStartTime = this.idleStartTime.get();
            return idleStartTime != null && Duration.nanosSince((long)idleStartTime).compareTo(idleTimeout) > 0;
        }

        public void blockCommit(String reason) {
            this.commitBlocked.set(Objects.requireNonNull(reason, "reason is null"));
        }

        public void checkOpenTransaction() {
            Boolean completedStatus = this.completedSuccessfully.get();
            if (completedStatus != null) {
                if (completedStatus.booleanValue()) {
                    throw new IllegalStateException("Current transaction already committed");
                }
                throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.TRANSACTION_ALREADY_ABORTED, "Current transaction is aborted, commands ignored until end of transaction block");
            }
        }

        public synchronized boolean isUsingCatalog(CatalogHandle catalogHandle) {
            return this.registeredCatalogs.values().stream().flatMap(Optional::stream).map(Catalog::getCatalogHandle).anyMatch(arg_0 -> ((CatalogHandle)catalogHandle).equals(arg_0));
        }

        private synchronized List<CatalogInfo> getActiveCatalogs() {
            return (List)this.activeCatalogs.keySet().stream().map(CatalogHandle::getCatalogName).distinct().map(key -> this.registeredCatalogs.getOrDefault(key, Optional.empty())).flatMap(Optional::stream).map(catalog -> new CatalogInfo(catalog.getCatalogName(), catalog.getCatalogHandle(), catalog.getConnectorName())).collect(ImmutableList.toImmutableList());
        }

        private synchronized List<CatalogInfo> listCatalogs() {
            this.catalogManager.getCatalogNames().forEach(catalogName -> this.registeredCatalogs.computeIfAbsent((String)catalogName, this.catalogManager::getCatalog));
            return (List)this.registeredCatalogs.values().stream().filter(Optional::isPresent).map(Optional::get).map(catalog -> new CatalogInfo(catalog.getCatalogName(), catalog.getCatalogHandle(), catalog.getConnectorName())).collect(ImmutableList.toImmutableList());
        }

        private synchronized Optional<CatalogHandle> tryRegisterCatalog(String catalogName) {
            Optional catalog = this.registeredCatalogs.computeIfAbsent(catalogName, this.catalogManager::getCatalog);
            catalog.ifPresent(Catalog::verify);
            return catalog.map(Catalog::getCatalogHandle);
        }

        private synchronized CatalogMetadata getTransactionCatalogMetadata(CatalogHandle catalogHandle) {
            this.checkOpenTransaction();
            CatalogMetadata catalogMetadata = this.activeCatalogs.get(catalogHandle.getRootCatalogHandle());
            if (catalogMetadata == null) {
                Preconditions.checkArgument((!catalogHandle.getType().isInternal() ? 1 : 0) != 0, (String)"Internal catalog handle not allowed: %s", (Object)catalogHandle);
                Catalog catalog = (Catalog)this.registeredCatalogs.getOrDefault(catalogHandle.getCatalogName(), Optional.empty()).orElseThrow(() -> new IllegalArgumentException("No catalog registered for handle: " + String.valueOf(catalogHandle)));
                catalogMetadata = catalog.beginTransaction(this.transactionId, this.isolationLevel, this.readOnly, this.autoCommitContext);
                this.activeCatalogs.put(catalogHandle, catalogMetadata);
            }
            return catalogMetadata;
        }

        public synchronized void checkConnectorWrite(CatalogHandle catalogHandle) {
            this.checkOpenTransaction();
            CatalogMetadata catalogMetadata = this.activeCatalogs.get(catalogHandle);
            Preconditions.checkArgument((catalogMetadata != null ? 1 : 0) != 0, (Object)"Cannot record write for catalog not part of transaction");
            if (this.readOnly) {
                throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.READ_ONLY_VIOLATION, "Cannot execute write in a read-only transaction");
            }
            if (!this.writtenCatalog.compareAndSet(null, catalogHandle) && !this.writtenCatalog.get().equals((Object)catalogHandle)) {
                String writtenCatalogName = this.activeCatalogs.get(this.writtenCatalog.get()).getCatalogName();
                throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.MULTI_CATALOG_WRITE_CONFLICT, "Multi-catalog writes not supported in a single transaction. Already wrote to catalog " + writtenCatalogName);
            }
            if (catalogMetadata.isSingleStatementWritesOnly() && !this.autoCommitContext) {
                throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.AUTOCOMMIT_WRITE_CONFLICT, "Catalog only supports writes using autocommit: " + catalogMetadata.getCatalogName());
            }
        }

        public synchronized ListenableFuture<Void> asyncCommit() {
            if (!this.completedSuccessfully.compareAndSet(null, true)) {
                if (this.completedSuccessfully.get().booleanValue()) {
                    return Futures.immediateVoidFuture();
                }
                return Futures.immediateFailedFuture((Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.TRANSACTION_ALREADY_ABORTED, "Current transaction has already been aborted"));
            }
            String commitBlockedReason = this.commitBlocked.get();
            if (commitBlockedReason != null) {
                return Futures.transform(this.abortInternal(), ignored -> {
                    throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.ADMINISTRATIVELY_KILLED, commitBlockedReason);
                }, (Executor)MoreExecutors.directExecutor());
            }
            CatalogHandle writeCatalogHandle = this.writtenCatalog.get();
            if (writeCatalogHandle == null) {
                ListenableFuture<Void> future = InMemoryTransactionManager.asVoid(Futures.allAsList((Iterable)this.activeCatalogs.values().stream().map(catalog -> Futures.submit(catalog::commit, (Executor)this.finishingExecutor)).collect(Collectors.toList())));
                MoreFutures.addExceptionCallback(future, throwable -> {
                    this.abortInternal();
                    log.error(throwable, "Read-only connector should not throw exception on commit");
                });
                return Futures.nonCancellationPropagating(future);
            }
            Supplier<ListenableFuture> commitReadOnlyConnectors = () -> {
                List futures = this.activeCatalogs.entrySet().stream().filter(entry -> !((CatalogHandle)entry.getKey()).equals((Object)writeCatalogHandle)).map(Map.Entry::getValue).map(transactionMetadata -> Futures.submit(transactionMetadata::commit, (Executor)this.finishingExecutor)).collect(Collectors.toList());
                ListenableFuture<Void> future = InMemoryTransactionManager.asVoid(Futures.allAsList(futures));
                MoreFutures.addExceptionCallback(future, throwable -> log.error(throwable, "Read-only connector should not throw exception on commit"));
                return future;
            };
            CatalogMetadata writeCatalog = this.activeCatalogs.get(writeCatalogHandle);
            ListenableFuture commitFuture = Futures.submit(writeCatalog::commit, (Executor)this.finishingExecutor);
            ListenableFuture readOnlyCommitFuture = Futures.transformAsync((ListenableFuture)commitFuture, ignored -> (ListenableFuture)commitReadOnlyConnectors.get(), (Executor)MoreExecutors.directExecutor());
            MoreFutures.addExceptionCallback((ListenableFuture)readOnlyCommitFuture, this::abortInternal);
            return Futures.nonCancellationPropagating((ListenableFuture)readOnlyCommitFuture);
        }

        public synchronized ListenableFuture<Void> asyncAbort() {
            if (!this.completedSuccessfully.compareAndSet(null, false)) {
                if (this.completedSuccessfully.get().booleanValue()) {
                    return Futures.immediateFailedFuture((Throwable)new IllegalStateException("Current transaction already committed"));
                }
                return Futures.immediateVoidFuture();
            }
            return this.abortInternal();
        }

        private synchronized ListenableFuture<Void> abortInternal() {
            List futures = this.activeCatalogs.values().stream().map(catalog -> Futures.submit(catalog::abort, (Executor)this.finishingExecutor)).collect(Collectors.toList());
            ListenableFuture<Void> future = InMemoryTransactionManager.asVoid(Futures.allAsList(futures));
            return Futures.nonCancellationPropagating(future);
        }

        public TransactionInfo getTransactionInfo() {
            Duration idleTime = Optional.ofNullable(this.idleStartTime.get()).map(Duration::nanosSince).orElse(new Duration(0.0, TimeUnit.MILLISECONDS));
            Optional<String> writtenCatalogName = Optional.ofNullable(this.writtenCatalog.get()).map(this.activeCatalogs::get).map(CatalogMetadata::getCatalogName);
            List<String> catalogNames = this.activeCatalogs.values().stream().map(CatalogMetadata::getCatalogName).sorted().collect(Collectors.toUnmodifiableList());
            return new TransactionInfo(this.transactionId, this.isolationLevel, this.readOnly, this.autoCommitContext, this.createTime, idleTime, catalogNames, writtenCatalogName, (Set<CatalogHandle>)ImmutableSet.copyOf(this.activeCatalogs.keySet()));
        }
    }
}

