/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.hotrod.tx.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import javax.security.auth.Subject;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.tx.XidImpl;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.server.hotrod.HotRodHeader;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.logging.Log;
import org.infinispan.server.hotrod.tx.operation.SecurityActions;
import org.infinispan.server.hotrod.tx.table.CacheNameCollector;
import org.infinispan.server.hotrod.tx.table.CacheXid;
import org.infinispan.server.hotrod.tx.table.GlobalTxTable;
import org.infinispan.server.hotrod.tx.table.TxState;
import org.infinispan.util.ByteString;

abstract class BaseCompleteTransactionOperation
implements CacheNameCollector,
Runnable {
    final XidImpl xid;
    final GlobalTxTable globalTxTable;
    final HotRodHeader header;
    final BiConsumer<HotRodHeader, Integer> reply;
    private final HotRodServer server;
    private final Subject subject;
    final Collection<ByteString> cacheNames = new ConcurrentLinkedQueue<ByteString>();
    final ExecutorService asyncExecutor;
    private final AtomicInteger expectedCaches = new AtomicInteger();
    volatile boolean hasErrors = false;
    volatile boolean hasCommits = false;
    volatile boolean hasRollbacks = false;

    BaseCompleteTransactionOperation(HotRodHeader header, HotRodServer server, Subject subject, XidImpl xid, BiConsumer<HotRodHeader, Integer> reply) {
        GlobalComponentRegistry gcr = SecurityActions.getGlobalComponentRegistry(server.getCacheManager());
        this.globalTxTable = (GlobalTxTable)gcr.getComponent(GlobalTxTable.class);
        this.asyncExecutor = (ExecutorService)gcr.getComponent(ExecutorService.class, "org.infinispan.executors.async");
        this.header = header;
        this.server = server;
        this.subject = subject;
        this.xid = xid;
        this.reply = reply;
    }

    @Override
    public final void expectedSize(int size) {
        this.expectedCaches.set(size);
    }

    @Override
    public final void noTransactionFound() {
        if (this.isTraceEnabled()) {
            this.log().tracef("[%s] No caches found.", this.xid);
        }
        this.reply.accept(this.header, -4);
    }

    abstract <T> BiFunction<T, Throwable, Void> handler();

    abstract void sendReply();

    abstract CacheRpcCommand buildRemoteCommand(Configuration var1, CommandsFactory var2, TxState var3);

    abstract CacheRpcCommand buildForwardCommand(ByteString var1, long var2);

    abstract CompletableFuture<Void> asyncCompleteLocalTransaction(AdvancedCache<?, ?> var1, long var2);

    abstract Log log();

    abstract boolean isTraceEnabled();

    void notifyCacheCollected() {
        int result = this.expectedCaches.decrementAndGet();
        if (this.isTraceEnabled()) {
            this.log().tracef("[%s] Cache collected. Missing=%s.", this.xid, result);
        }
        if (result == 0) {
            this.onCachesCollected();
        }
    }

    private void onCachesCollected() {
        int size;
        if (this.isTraceEnabled()) {
            this.log().tracef("[%s] All caches collected: %s", this.xid, this.cacheNames);
        }
        if ((size = this.cacheNames.size()) == 0) {
            this.sendReply();
            return;
        }
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(size);
        for (ByteString cacheName : this.cacheNames) {
            try {
                futures.add(this.completeCache(cacheName));
            }
            catch (Throwable t) {
                this.hasErrors = true;
            }
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenRun(this::sendReply);
    }

    private CompletableFuture<Void> completeCache(ByteString cacheName) throws Throwable {
        TxState state = this.globalTxTable.getState(new CacheXid(cacheName, this.xid));
        HotRodServer.CacheInfo cacheInfo = this.server.getCacheInfo(cacheName.toString(), this.header.getVersion(), this.header.getMessageId(), true);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, this.header, this.subject);
        RpcManager rpcManager = cache.getRpcManager();
        if (rpcManager == null || rpcManager.getAddress().equals(state.getOriginator())) {
            if (this.isTraceEnabled()) {
                this.log().tracef("[%s] Completing local executed transaction.", this.xid);
            }
            return this.asyncCompleteLocalTransaction(cache, state.getTimeout());
        }
        if (rpcManager.getMembers().contains(state.getOriginator())) {
            if (this.isTraceEnabled()) {
                this.log().tracef("[%s] Forward remotely executed transaction to %s.", this.xid, state.getOriginator());
            }
            return this.forwardCompleteCommand(cacheName, rpcManager, state);
        }
        if (this.isTraceEnabled()) {
            this.log().tracef("[%s] Originator, %s, left the cluster.", this.xid, state.getOriginator());
        }
        return this.completeWithRemoteCommand(cache, rpcManager, state);
    }

    private CompletableFuture<Void> completeWithRemoteCommand(AdvancedCache<?, ?> cache, RpcManager rpcManager, TxState state) throws Throwable {
        CommandsFactory commandsFactory = SecurityActions.getComponentRegistry(cache).getCommandsFactory();
        CacheRpcCommand command = this.buildRemoteCommand(cache.getCacheConfiguration(), commandsFactory, state);
        CompletableFuture<Void> remote = rpcManager.invokeCommandOnAll((ReplicableCommand)command, (ResponseCollector)VoidResponseCollector.validOnly(), rpcManager.getSyncRpcOptions()).handle(this.handler()).toCompletableFuture();
        commandsFactory.initializeReplicableCommand((ReplicableCommand)command, false);
        CompletionStage local = command.invokeAsync().handle(this.handler());
        return CompletableFuture.allOf(new CompletableFuture[]{remote, local});
    }

    private CompletableFuture<Void> forwardCompleteCommand(ByteString cacheName, RpcManager rpcManager, TxState state) {
        Address originator = state.getOriginator();
        CacheRpcCommand command = this.buildForwardCommand(cacheName, state.getTimeout());
        return rpcManager.invokeCommand(originator, (ReplicableCommand)command, (ResponseCollector)VoidResponseCollector.validOnly(), rpcManager.getSyncRpcOptions()).handle(this.handler()).toCompletableFuture();
    }
}

