/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.GetKeysInGroupCommand;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.group.GroupManager;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.InvocationFinallyFunction;
import org.infinispan.remoting.RemoteException;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public abstract class BaseStateTransferInterceptor
extends DDAsyncInterceptor {
    private final boolean trace = this.getLog().isTraceEnabled();
    protected StateTransferManager stateTransferManager;
    protected StateTransferLock stateTransferLock;
    protected Executor remoteExecutor;
    private GroupManager groupManager;
    private ScheduledExecutorService timeoutExecutor;
    private long transactionDataTimeout;
    private final InvocationFinallyFunction handleLocalGetKeysInGroupReturn = this::handleLocalGetKeysInGroupReturn;

    @Inject
    public void init(StateTransferLock stateTransferLock, Configuration configuration, StateTransferManager stateTransferManager, GroupManager groupManager, @ComponentName(value="org.infinispan.executors.timeout") ScheduledExecutorService timeoutExecutor, @ComponentName(value="org.infinispan.executors.remote") Executor remoteExecutor) {
        this.stateTransferLock = stateTransferLock;
        this.stateTransferManager = stateTransferManager;
        this.groupManager = groupManager;
        this.timeoutExecutor = timeoutExecutor;
        this.remoteExecutor = remoteExecutor;
        this.transactionDataTimeout = configuration.clustering().remoteTimeout();
    }

    @Override
    public Object visitGetKeysInGroupCommand(InvocationContext ctx, GetKeysInGroupCommand command) throws Throwable {
        this.updateTopologyId(command);
        if (ctx.isOriginLocal()) {
            return this.invokeNextAndHandle(ctx, command, this.handleLocalGetKeysInGroupReturn);
        }
        return this.invokeNextThenAccept(ctx, command, (rCtx, rCommand, rv) -> {
            GetKeysInGroupCommand cmd = (GetKeysInGroupCommand)rCommand;
            int commandTopologyId = cmd.getTopologyId();
            String groupName = cmd.getGroupName();
            if (this.groupManager.isOwner(groupName) && this.currentTopologyId() != commandTopologyId) {
                throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " + commandTopologyId + ", got " + this.currentTopologyId());
            }
        });
    }

    private Object handleLocalGetKeysInGroupReturn(InvocationContext ctx, VisitableCommand command, Object rv, Throwable throwable) throws Throwable {
        boolean shouldRetry;
        GetKeysInGroupCommand cmd = (GetKeysInGroupCommand)command;
        int commandTopologyId = cmd.getTopologyId();
        if (throwable != null) {
            Throwable ce = throwable;
            while (ce instanceof RemoteException) {
                ce = ce.getCause();
            }
            shouldRetry = ce instanceof OutdatedTopologyException || ce instanceof SuspectException;
        } else {
            boolean bl = shouldRetry = this.groupManager.isOwner(cmd.getGroupName()) && this.currentTopologyId() != commandTopologyId;
        }
        if (shouldRetry) {
            this.logRetry(cmd);
            int newTopologyId = Math.max(this.currentTopologyId(), commandTopologyId + 1);
            cmd.setTopologyId(newTopologyId);
            CompletableFuture<Void> transactionDataFuture = this.stateTransferLock.transactionDataFuture(newTopologyId);
            return this.retryWhenDone(transactionDataFuture, newTopologyId, ctx, command, this.handleLocalGetKeysInGroupReturn);
        }
        return BaseStateTransferInterceptor.valueOrException(rv, throwable);
    }

    protected final void logRetry(VisitableCommand command) {
        if (this.trace) {
            this.getLog().tracef("Retrying command because of topology change: %s", (Object)command);
        }
    }

    protected final int currentTopologyId() {
        CacheTopology cacheTopology = this.stateTransferManager.getCacheTopology();
        return cacheTopology == null ? -1 : cacheTopology.getTopologyId();
    }

    protected final void updateTopologyId(TopologyAffectedCommand command) throws InterruptedException {
        CacheTopology cacheTopology;
        if (command.getTopologyId() == -1 && (cacheTopology = this.stateTransferManager.getCacheTopology()) != null) {
            command.setTopologyId(cacheTopology.getTopologyId());
        }
    }

    protected <T extends VisitableCommand> Object retryWhenDone(CompletableFuture<Void> future, int topologyId, InvocationContext ctx, T command, InvocationFinallyFunction callback) throws Throwable {
        if (future.isDone()) {
            this.getLog().tracef("Retrying command %s for topology %d", (Object)command, (Object)topologyId);
            return this.invokeNextAndHandle(ctx, command, callback);
        }
        CancellableRetry<T> cancellableRetry = new CancellableRetry<T>(command, topologyId, this.stateTransferManager);
        CompletionStage retryFuture = future.handleAsync(cancellableRetry, this.remoteExecutor);
        cancellableRetry.setRetryFuture((CompletableFuture<Void>)retryFuture);
        ScheduledFuture<?> timeoutFuture = this.timeoutExecutor.schedule(cancellableRetry, this.transactionDataTimeout, TimeUnit.MILLISECONDS);
        cancellableRetry.setTimeoutFuture(timeoutFuture);
        return BaseStateTransferInterceptor.makeStage(this.asyncInvokeNext(ctx, command, (CompletableFuture<?>)retryFuture)).andHandle(ctx, command, callback);
    }

    protected abstract Log getLog();

    private static class CancellableRetry<T extends VisitableCommand>
    implements BiFunction<Void, Throwable, Void>,
    Runnable {
        private static final AtomicReferenceFieldUpdater<CancellableRetry, Throwable> cancellableRetryUpdater = AtomicReferenceFieldUpdater.newUpdater(CancellableRetry.class, Throwable.class, "cancelled");
        private static final AtomicReferenceFieldUpdater<CancellableRetry, Object> timeoutFutureUpdater = AtomicReferenceFieldUpdater.newUpdater(CancellableRetry.class, Object.class, "timeoutFuture");
        private static final Log log = LogFactory.getLog(CancellableRetry.class);
        private static final Throwable DUMMY = new Throwable("Command is retried");
        private final T command;
        private final int topologyId;
        private final StateTransferManager stateTransferManager;
        private volatile Throwable cancelled = null;
        private CompletableFuture<Void> retryFuture;
        private volatile Object timeoutFuture;

        public CancellableRetry(T command, int topologyId, StateTransferManager stateTransferManager) {
            this.command = command;
            this.topologyId = topologyId;
            this.stateTransferManager = stateTransferManager;
        }

        @Override
        public Void apply(Void nil, Throwable throwable) {
            if (!timeoutFutureUpdater.compareAndSet(this, null, DUMMY)) {
                ((ScheduledFuture)this.timeoutFuture).cancel(false);
            }
            if (throwable != null) {
                throw CompletableFutures.asCompletionException(throwable);
            }
            if (!cancellableRetryUpdater.compareAndSet(this, null, DUMMY)) {
                log.tracef("Not retrying command %s as it has been cancelled.", (Object)this.command);
                throw CompletableFutures.asCompletionException(this.cancelled);
            }
            log.tracef("Retrying command %s for topology %d", (Object)this.command, (Object)this.topologyId);
            return null;
        }

        @Override
        public void run() {
            TimeoutException timeoutException = new TimeoutException("Timed out waiting for topology " + this.topologyId);
            if (cancellableRetryUpdater.compareAndSet(this, null, timeoutException)) {
                this.retryFuture.completeExceptionally(timeoutException);
            }
        }

        void setRetryFuture(CompletableFuture<Void> retryFuture) {
            this.retryFuture = retryFuture;
        }

        void setTimeoutFuture(ScheduledFuture<?> timeoutFuture) {
            if (!timeoutFutureUpdater.compareAndSet(this, null, timeoutFuture)) {
                timeoutFuture.cancel(false);
            }
        }
    }
}

