/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.hotrod.tx.table;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
import javax.transaction.xa.Xid;
import net.jcip.annotations.GuardedBy;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.api.Lifecycle;
import org.infinispan.commons.tx.XidImpl;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.functional.FunctionalMap;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.ReadWriteMapImpl;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.server.hotrod.logging.Log;
import org.infinispan.server.hotrod.tx.table.CacheNameCollector;
import org.infinispan.server.hotrod.tx.table.CacheXid;
import org.infinispan.server.hotrod.tx.table.PerCacheTxTable;
import org.infinispan.server.hotrod.tx.table.Status;
import org.infinispan.server.hotrod.tx.table.TxState;
import org.infinispan.server.hotrod.tx.table.functions.ConditionalMarkAsRollbackFunction;
import org.infinispan.server.hotrod.tx.table.functions.SetCompletedTransactionFunction;
import org.infinispan.server.hotrod.tx.table.functions.SetDecisionFunction;
import org.infinispan.server.hotrod.tx.table.functions.TxFunction;
import org.infinispan.server.hotrod.tx.table.functions.XidPredicate;
import org.infinispan.stream.CacheCollectors;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.tm.EmbeddedTransaction;
import org.infinispan.util.ByteString;
import org.infinispan.util.TimeService;
import org.infinispan.util.logging.LogFactory;

@Scope(value=Scopes.GLOBAL)
public class GlobalTxTable
implements Runnable,
Lifecycle {
    private static final Log log = (Log)LogFactory.getLog(GlobalTxTable.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();
    private final Cache<CacheXid, TxState> storage;
    private final FunctionalMap.ReadWriteMap<CacheXid, TxState> rwMap;
    private final GlobalComponentRegistry gcr;
    @GuardedBy(value="this")
    private ScheduledFuture<?> scheduledFuture;
    @Inject
    private TimeService timeService;
    @Inject
    @ComponentName(value="org.infinispan.executors.async")
    private ExecutorService asyncExecutor;
    @Inject
    @ComponentName(value="org.infinispan.executors.expiration")
    private ScheduledExecutorService scheduledExecutor;

    public GlobalTxTable(Cache<CacheXid, TxState> storage, GlobalComponentRegistry gcr) {
        this.storage = storage;
        this.rwMap = ReadWriteMapImpl.create((FunctionalMapImpl)FunctionalMapImpl.create((AdvancedCache)storage.getAdvancedCache()));
        this.gcr = gcr;
    }

    @Start
    public synchronized void start() {
        if (this.scheduledFuture == null) {
            this.scheduledFuture = this.scheduledExecutor.scheduleWithFixedDelay(this, 60000L, 60000L, TimeUnit.MILLISECONDS);
        }
    }

    @Stop
    public synchronized void stop() {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
            this.scheduledFuture = null;
        }
    }

    public Status update(CacheXid key, TxFunction function, long timeoutMillis) {
        if (trace) {
            log.tracef("[%s] Updating with function: %s", key, function);
        }
        try {
            CompletableFuture cf = this.rwMap.eval((Object)key, (Function)function);
            Status status = Status.valueOf((Byte)cf.get(timeoutMillis, TimeUnit.MILLISECONDS));
            if (trace) {
                log.tracef("[%s] Return value is %s", key, (Object)status);
            }
            return status;
        }
        catch (InterruptedException e) {
            if (trace) {
                log.tracef("[%s] Interrupted!", key);
            }
            Thread.currentThread().interrupt();
            return Status.ERROR;
        }
        catch (ExecutionException | TimeoutException e) {
            if (trace) {
                log.tracef(e, "[%s] Error!", key);
            }
            return Status.ERROR;
        }
    }

    public void markToCommit(XidImpl xid, CacheNameCollector collector) {
        this.markTx(xid, true, collector);
    }

    public void markToRollback(XidImpl xid, CacheNameCollector collector) {
        this.markTx(xid, false, collector);
    }

    public TxState getState(CacheXid xid) {
        TxState state = (TxState)this.storage.get((Object)xid);
        if (trace) {
            log.tracef("[%s] Get TxState = %s", xid, state);
        }
        return state;
    }

    public void remove(CacheXid cacheXid) {
        if (trace) {
            log.tracef("[%s] Removed!", cacheXid);
        }
        this.storage.remove((Object)cacheXid);
    }

    public void forgetTransaction(XidImpl xid) {
        if (trace) {
            log.tracef("[%s] Forgetting transaction.", xid);
        }
        this.storage.keySet().parallelStream().filter((Predicate)new XidPredicate(xid)).forEach(BasicCache::remove);
    }

    @Override
    public void run() {
        long currentTimestamp = this.timeService.time();
        for (Map.Entry entry : this.storage.entrySet()) {
            TxState state = (TxState)entry.getValue();
            CacheXid cacheXid = (CacheXid)entry.getKey();
            if (!state.hasTimedOut(currentTimestamp) || this.skipReaper(state.getOriginator(), cacheXid.getCacheName())) continue;
            switch (state.getStatus()) {
                case ACTIVE: 
                case PREPARING: 
                case PREPARED: {
                    this.onOngoingTransaction(cacheXid, state);
                    break;
                }
                case MARK_ROLLBACK: {
                    this.onTransactionDecision(cacheXid, state, false);
                    break;
                }
                case MARK_COMMIT: {
                    this.onTransactionDecision(cacheXid, state, true);
                    break;
                }
                case COMMITTED: 
                case ROLLED_BACK: {
                    this.onTransactionCompleted(cacheXid);
                    break;
                }
            }
        }
    }

    public Collection<Xid> getPreparedTransactions() {
        long currentTimestamp = this.timeService.time();
        HashSet<Xid> preparedTx = new HashSet<Xid>();
        for (Map.Entry entry : this.storage.entrySet()) {
            XidImpl xid = ((CacheXid)entry.getKey()).getXid();
            TxState state = (TxState)entry.getValue();
            if (trace) {
                log.tracef("Checking transaction xid=%s for recovery. TimedOut?=%s, Recoverable?=%s, Status=%s", new Object[]{xid, state.hasTimedOut(currentTimestamp), state.isRecoverable(), state.getStatus()});
            }
            if (!state.hasTimedOut(currentTimestamp) || !state.isRecoverable() || state.getStatus() != Status.PREPARED) continue;
            preparedTx.add((Xid)xid);
        }
        return preparedTx;
    }

    public boolean isEmpty() {
        return this.storage.isEmpty();
    }

    private void onOngoingTransaction(CacheXid cacheXid, TxState state) {
        if (state.getStatus() == Status.PREPARED && state.isRecoverable()) {
            return;
        }
        ComponentRegistry cr = this.gcr.getNamedComponentRegistry(cacheXid.getCacheName());
        if (cr == null) {
            return;
        }
        RpcManager rpcManager = (RpcManager)cr.getComponent(RpcManager.class);
        if (this.isRemote(rpcManager, state.getOriginator())) {
            this.rollbackOldTransaction(cacheXid, state, () -> this.rollbackRemote(cr, cacheXid, state));
        } else {
            PerCacheTxTable txTable = (PerCacheTxTable)cr.getComponent(PerCacheTxTable.class);
            EmbeddedTransaction tx = txTable.getLocalTx((Xid)cacheXid.getXid());
            if (tx == null) {
                this.onTransactionCompleted(cacheXid);
            } else {
                this.asyncExecutor.execute(() -> this.rollbackOldTransaction(cacheXid, state, () -> this.completeLocal(txTable, cacheXid, tx, false)));
            }
        }
    }

    private void rollbackOldTransaction(CacheXid cacheXid, TxState state, Runnable onSuccessAction) {
        ConditionalMarkAsRollbackFunction txFunction = new ConditionalMarkAsRollbackFunction(state.getStatus());
        this.rwMap.eval((Object)cacheXid, (Function)txFunction).thenAccept(aByte -> {
            if (aByte == Status.OK.value) {
                onSuccessAction.run();
            }
        });
    }

    private void rollbackRemote(ComponentRegistry cr, CacheXid cacheXid, TxState state) {
        RollbackCommand rpcCommand = cr.getCommandsFactory().buildRollbackCommand(state.getGlobalTransaction());
        RpcManager rpcManager = (RpcManager)cr.getComponent(RpcManager.class);
        rpcCommand.setTopologyId(rpcManager.getTopologyId());
        rpcManager.invokeCommandOnAll((ReplicableCommand)rpcCommand, (ResponseCollector)VoidResponseCollector.validOnly(), rpcManager.getSyncRpcOptions()).thenRun(() -> {
            SetCompletedTransactionFunction function = new SetCompletedTransactionFunction(false);
            this.rwMap.eval((Object)cacheXid, (Function)function);
        });
    }

    private void onTransactionDecision(CacheXid cacheXid, TxState state, boolean commit) {
        ComponentRegistry cr = this.gcr.getNamedComponentRegistry(cacheXid.getCacheName());
        if (cr == null) {
            return;
        }
        RpcManager rpcManager = (RpcManager)cr.getComponent(RpcManager.class);
        if (rpcManager == null || state.getOriginator().equals(rpcManager.getAddress())) {
            PerCacheTxTable txTable = (PerCacheTxTable)cr.getComponent(PerCacheTxTable.class);
            EmbeddedTransaction tx = txTable.getLocalTx((Xid)cacheXid.getXid());
            if (tx == null) {
                this.onTransactionCompleted(cacheXid);
            } else {
                this.asyncExecutor.execute(() -> this.completeLocal(txTable, cacheXid, tx, commit));
            }
        } else if (commit) {
            Object rpcCommand = ((Configuration)cr.getComponent(Configuration.class)).transaction().lockingMode() == LockingMode.PESSIMISTIC ? cr.getCommandsFactory().buildPrepareCommand(state.getGlobalTransaction(), state.getModifications(), true) : cr.getCommandsFactory().buildCommitCommand(state.getGlobalTransaction());
            rpcCommand.setTopologyId(rpcManager.getTopologyId());
            rpcManager.invokeCommandOnAll((ReplicableCommand)rpcCommand, (ResponseCollector)VoidResponseCollector.validOnly(), rpcManager.getSyncRpcOptions()).handle((aVoid, throwable) -> {
                SetCompletedTransactionFunction function = new SetCompletedTransactionFunction(true);
                this.rwMap.eval((Object)cacheXid, (Function)function);
                return null;
            });
        } else {
            this.rollbackRemote(cr, cacheXid, state);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completeLocal(PerCacheTxTable txTable, CacheXid cacheXid, EmbeddedTransaction tx, boolean commit) {
        try {
            tx.runCommit(!commit);
        }
        catch (HeuristicMixedException | HeuristicRollbackException | RollbackException throwable) {
        }
        finally {
            txTable.removeLocalTx((Xid)cacheXid.getXid());
        }
        this.onTransactionCompleted(cacheXid);
    }

    private void onTransactionCompleted(CacheXid cacheXid) {
        this.storage.removeAsync((Object)cacheXid);
    }

    private boolean skipReaper(Address originator, ByteString cacheName) {
        ComponentRegistry cr = this.gcr.getNamedComponentRegistry(cacheName);
        if (cr == null) {
            return false;
        }
        RpcManager rpcManager = (RpcManager)cr.getComponent(RpcManager.class);
        return this.isRemote(rpcManager, originator) && rpcManager.getMembers().contains(originator);
    }

    private boolean isRemote(RpcManager rpcManager, Address originator) {
        return rpcManager != null && !originator.equals(rpcManager.getAddress());
    }

    private List<CacheXid> getKeys(XidImpl xid) {
        return (List)this.storage.keySet().stream().filter((Predicate)new XidPredicate(xid)).collect(CacheCollectors.serializableCollector(Collectors::toList));
    }

    private void markTx(XidImpl xid, boolean commit, CacheNameCollector collector) {
        int size;
        if (trace) {
            log.tracef("[%s] Set Transaction Decision to %s", xid, commit ? "Commit" : "Rollback");
        }
        List<CacheXid> cacheXids = this.getKeys(xid);
        if (trace) {
            log.tracef("[%s] Fetched CacheXids=%s", xid, cacheXids);
        }
        if ((size = cacheXids.size()) == 0) {
            collector.noTransactionFound();
            return;
        }
        collector.expectedSize(size);
        SetDecisionFunction function = new SetDecisionFunction(commit);
        for (CacheXid cacheXid : cacheXids) {
            this.rwMap.eval((Object)cacheXid, (Function)function).handle((statusValue, throwable) -> {
                Status status = throwable == null ? Status.valueOf(statusValue) : Status.ERROR;
                collector.addCache(cacheXid.getCacheName(), status);
                return null;
            });
        }
    }
}

