/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.hotrod.tx.operation;

import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import javax.security.auth.Subject;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.tx.TransactionBoundaryCommand;
import org.infinispan.commons.tx.XidImpl;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.security.actions.SecurityActions;
import org.infinispan.server.hotrod.HotRodHeader;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.logging.Log;
import org.infinispan.server.hotrod.tx.table.CacheNameCollector;
import org.infinispan.server.hotrod.tx.table.CacheXid;
import org.infinispan.server.hotrod.tx.table.GlobalTxTable;
import org.infinispan.server.hotrod.tx.table.TxState;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.logging.LogFactory;

abstract class BaseCompleteTransactionOperation<C1 extends TransactionBoundaryCommand, C2 extends CacheRpcCommand>
implements CacheNameCollector,
Runnable {
    private static final Log log = (Log)LogFactory.getLog(BaseCompleteTransactionOperation.class, Log.class);
    final XidImpl xid;
    final GlobalTxTable globalTxTable;
    final HotRodHeader header;
    final BiConsumer<HotRodHeader, Integer> reply;
    private final HotRodServer server;
    private final Subject subject;
    final Collection<ByteString> cacheNames = new ConcurrentLinkedQueue<ByteString>();
    final BlockingManager blockingManager;
    private final AtomicInteger expectedCaches = new AtomicInteger();
    volatile boolean hasErrors = false;
    volatile boolean hasCommits = false;
    volatile boolean hasRollbacks = false;

    BaseCompleteTransactionOperation(HotRodHeader header, HotRodServer server, Subject subject, XidImpl xid, BiConsumer<HotRodHeader, Integer> reply) {
        GlobalComponentRegistry gcr = SecurityActions.getGlobalComponentRegistry((EmbeddedCacheManager)server.getCacheManager());
        this.globalTxTable = (GlobalTxTable)gcr.getComponent(GlobalTxTable.class);
        this.blockingManager = (BlockingManager)gcr.getComponent(BlockingManager.class);
        this.header = header;
        this.server = server;
        this.subject = subject;
        this.xid = xid;
        this.reply = reply;
    }

    @Override
    public final void expectedSize(int size) {
        this.expectedCaches.set(size);
    }

    @Override
    public final void noTransactionFound() {
        if (log.isTraceEnabled()) {
            log.tracef("[%s] No caches found.", this.xid);
        }
        this.reply.accept(this.header, -4);
    }

    abstract <T> BiFunction<T, Throwable, Void> handler();

    abstract void sendReply();

    abstract C1 buildRemoteCommand(Configuration var1, CommandsFactory var2, TxState var3);

    abstract C2 buildForwardCommand(ByteString var1, long var2);

    abstract void asyncCompleteLocalTransaction(AdvancedCache<?, ?> var1, long var2, AggregateCompletionStage<Void> var4);

    void notifyCacheCollected() {
        int result = this.expectedCaches.decrementAndGet();
        if (log.isTraceEnabled()) {
            log.tracef("[%s] Cache collected. Missing=%s.", this.xid, result);
        }
        if (result == 0) {
            this.onCachesCollected();
        }
    }

    private void onCachesCollected() {
        int size;
        if (log.isTraceEnabled()) {
            log.tracef("[%s] All caches collected: %s", this.xid, this.cacheNames);
        }
        if ((size = this.cacheNames.size()) == 0) {
            this.sendReply();
            return;
        }
        AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        for (ByteString cacheName : this.cacheNames) {
            try {
                this.completeCache(cacheName, (AggregateCompletionStage<Void>)aggregateCompletionStage);
            }
            catch (Throwable t) {
                if (log.isTraceEnabled()) {
                    log.tracef(t, "[%s] Error while trying to complete transaction for cache %s", this.xid, cacheName);
                }
                this.hasErrors = true;
            }
        }
        aggregateCompletionStage.freeze().thenRun(this::sendReply);
    }

    private void completeCache(ByteString cacheName, AggregateCompletionStage<Void> stageCollector) throws Throwable {
        LocalizedCacheTopology topology;
        TxState state = this.globalTxTable.getState(new CacheXid(cacheName, this.xid));
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(cacheName.toString(), this.header.getVersion(), this.header.getMessageId(), true);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, this.header, this.subject);
        DistributionManager distributionManager = cache.getDistributionManager();
        LocalizedCacheTopology localizedCacheTopology = topology = distributionManager == null ? null : distributionManager.getCacheTopology();
        if (topology == null || topology.getLocalAddress().equals((Object)state.getOriginator())) {
            if (log.isTraceEnabled()) {
                log.tracef("[%s] Completing local executed transaction.", this.xid);
            }
            this.asyncCompleteLocalTransaction(cache, state.getTimeout(), stageCollector);
        } else if (topology.getMembers().contains(state.getOriginator())) {
            if (log.isTraceEnabled()) {
                log.tracef("[%s] Forward remotely executed transaction to %s.", this.xid, state.getOriginator());
            }
            this.forwardCompleteCommand(cacheName, state, cache.getRpcManager(), stageCollector);
        } else {
            if (log.isTraceEnabled()) {
                log.tracef("[%s] Originator, %s, left the cluster.", this.xid, state.getOriginator());
            }
            this.completeWithRemoteCommand(cache, state, cache.getRpcManager(), topology.getTopologyId(), stageCollector);
        }
    }

    private void completeWithRemoteCommand(AdvancedCache<?, ?> cache, TxState state, RpcManager rpcManager, int topologyId, AggregateCompletionStage<Void> stageCollector) throws Throwable {
        ComponentRegistry registry = SecurityActions.getCacheComponentRegistry(cache);
        CommandsFactory commandsFactory = registry.getCommandsFactory();
        C1 command = this.buildRemoteCommand(cache.getCacheConfiguration(), commandsFactory, state);
        command.setTopologyId(topologyId);
        stageCollector.dependsOn(rpcManager.invokeCommandOnAll(command, (ResponseCollector)VoidResponseCollector.validOnly(), rpcManager.getSyncRpcOptions()).handle(this.handler()));
        stageCollector.dependsOn(command.invokeAsync(registry).handle(this.handler()));
    }

    private void forwardCompleteCommand(ByteString cacheName, TxState state, RpcManager rpcManager, AggregateCompletionStage<Void> stageCollector) {
        Address originator = state.getOriginator();
        C2 command = this.buildForwardCommand(cacheName, state.getTimeout());
        stageCollector.dependsOn(rpcManager.invokeCommand(originator, command, (ResponseCollector)VoidResponseCollector.validOnly(), rpcManager.getSyncRpcOptions()).handle(this.handler()));
    }
}

