/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.infinispan.commands.AbstractTopologyAffectedCommand;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.ReadOnlyKeyCommand;
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.remote.GetKeysInGroupCommand;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.ValueMatcher;
import org.infinispan.commons.CacheException;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.Ownership;
import org.infinispan.distribution.RemoteValueRetrievedListener;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.group.GroupManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.BasicInvocationStage;
import org.infinispan.interceptors.InvocationStage;
import org.infinispan.interceptors.impl.ClusteringInterceptor;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.remoting.RemoteException;
import org.infinispan.remoting.RpcException;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.responses.UnsuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public abstract class BaseDistributionInterceptor
extends ClusteringInterceptor {
    protected DistributionManager dm;
    protected ClusteringDependentLogic cdl;
    protected RemoteValueRetrievedListener rvrl;
    protected boolean isL1Enabled;
    private GroupManager groupManager;
    private static final Log log = LogFactory.getLog(BaseDistributionInterceptor.class);
    private static final boolean trace = log.isTraceEnabled();

    @Override
    protected Log getLog() {
        return log;
    }

    @Inject
    public void injectDependencies(DistributionManager distributionManager, ClusteringDependentLogic cdl, RemoteValueRetrievedListener rvrl, GroupManager groupManager) {
        this.dm = distributionManager;
        this.cdl = cdl;
        this.rvrl = rvrl;
        this.groupManager = groupManager;
    }

    @Start
    public void configure() {
        this.isL1Enabled = this.cacheConfiguration.clustering().l1().enabled();
    }

    @Override
    public final BasicInvocationStage visitGetKeysInGroupCommand(InvocationContext ctx, GetKeysInGroupCommand command) throws Throwable {
        String groupName = command.getGroupName();
        if (command.isGroupOwner()) {
            return this.invokeNext(ctx, command);
        }
        CompletableFuture<Map<Address, Response>> future = this.rpcManager.invokeRemotelyAsync(Collections.singleton(this.groupManager.getPrimaryOwner(groupName)), command, this.rpcManager.getDefaultRpcOptions(true));
        return this.invokeNextAsync(ctx, command, (CompletableFuture<?>)future.thenAccept(responses -> {
            Response response;
            if (!responses.isEmpty() && (response = (Response)responses.values().iterator().next()) instanceof SuccessfulResponse) {
                List cacheEntries = (List)((SuccessfulResponse)response).getResponseValue();
                for (CacheEntry entry : cacheEntries) {
                    this.entryFactory.wrapExternalEntry(ctx, entry.getKey(), entry, false);
                }
            }
        }));
    }

    @Override
    public final BasicInvocationStage visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
        if (ctx.isOriginLocal() && !this.isLocalModeForced(command)) {
            RpcOptions rpcOptions = this.rpcManager.getRpcOptionsBuilder(this.isSynchronous(command) ? ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS : ResponseMode.ASYNCHRONOUS).build();
            return this.invokeNextAsync(ctx, command, this.rpcManager.invokeRemotelyAsync(null, command, rpcOptions));
        }
        return this.invokeNext(ctx, command);
    }

    protected final CompletableFuture<Void> remoteGet(InvocationContext ctx, AbstractTopologyAffectedCommand command, Object key, boolean isWrite) {
        CacheTopology cacheTopology = this.checkTopologyId(command);
        int topologyId = cacheTopology.getTopologyId();
        ConsistentHash readCH = cacheTopology.getReadConsistentHash();
        DistributionInfo info = new DistributionInfo(key, readCH, this.rpcManager.getAddress());
        if (info.ownership() != Ownership.NON_OWNER) {
            if (trace) {
                log.tracef("Key %s is local, skipping remote get. Command topology is %d, current topology is %d", key, (Object)command.getTopologyId(), (Object)topologyId);
            }
            if (command.getTopologyId() == topologyId) {
                throw new IllegalStateException();
            }
            throw new OutdatedTopologyException(topologyId);
        }
        if (trace) {
            log.tracef("Perform remote get for key %s. currentTopologyId=%s, owners=%s", key, (Object)topologyId, (Object)info.owners());
        }
        ClusteredGetCommand getCommand = this.cf.buildClusteredGetCommand(key, command.getFlagsBitSet());
        getCommand.setTopologyId(topologyId);
        getCommand.setWrite(isWrite);
        return this.rpcManager.invokeRemotelyAsync(info.owners(), getCommand, this.staggeredOptions).thenAccept(responses -> {
            for (Response r : responses.values()) {
                if (!(r instanceof SuccessfulResponse)) continue;
                SuccessfulResponse response = (SuccessfulResponse)r;
                Object responseValue = response.getResponseValue();
                if (responseValue == null) {
                    if (this.rvrl != null) {
                        this.rvrl.remoteValueNotFound(key);
                    }
                    this.entryFactory.wrapExternalEntry(ctx, key, NullCacheEntry.getInstance(), isWrite);
                    return;
                }
                InternalCacheEntry ice = ((InternalCacheValue)responseValue).toInternalCacheEntry(key);
                if (this.rvrl != null) {
                    this.rvrl.remoteValueFound(ice);
                }
                this.entryFactory.wrapExternalEntry(ctx, key, ice, isWrite);
                return;
            }
            throw new OutdatedTopologyException("Did not get any successful response, got " + responses);
        });
    }

    protected final BasicInvocationStage handleNonTxWriteCommand(InvocationContext ctx, AbstractDataWriteCommand command) throws Throwable {
        Object key = command.getKey();
        CacheEntry entry = ctx.lookupEntry(key);
        if (this.isLocalModeForced(command)) {
            if (entry == null) {
                this.entryFactory.wrapExternalEntry(ctx, key, null, true);
            }
            return this.invokeNext(ctx, command);
        }
        if (entry == null) {
            DistributionInfo info = new DistributionInfo(key, this.checkTopologyId(command).getWriteConsistentHash(), this.rpcManager.getAddress());
            boolean load = this.shouldLoad(ctx, command, info);
            if (info.isPrimary()) {
                throw new IllegalStateException("Primary owner in writeCH should always be an owner in readCH as well.");
            }
            if (ctx.isOriginLocal()) {
                return this.invokeRemotely(command, info.primary());
            }
            if (load) {
                CompletableFuture<Void> getFuture = this.remoteGet(ctx, command, command.getKey(), true);
                return this.invokeNextAsync(ctx, command, getFuture);
            }
            this.entryFactory.wrapExternalEntry(ctx, key, null, true);
            return this.invokeNext(ctx, command);
        }
        DistributionInfo info = new DistributionInfo(key, this.checkTopologyId(command).getWriteConsistentHash(), this.rpcManager.getAddress());
        if (info.isPrimary()) {
            return this.invokeNext(ctx, command).thenCompose(this::primaryReturnHandler);
        }
        if (ctx.isOriginLocal()) {
            return this.invokeRemotely(command, info.primary());
        }
        return this.invokeNext(ctx, command);
    }

    private boolean shouldLoad(InvocationContext ctx, AbstractDataWriteCommand command, DistributionInfo info) {
        if (!command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)) {
            VisitableCommand.LoadType loadType = command.loadType();
            switch (loadType) {
                case DONT_LOAD: {
                    return false;
                }
                case OWNER: {
                    switch (info.ownership()) {
                        case PRIMARY: {
                            return true;
                        }
                        case BACKUP: {
                            return !ctx.isOriginLocal();
                        }
                        case NON_OWNER: {
                            return false;
                        }
                    }
                    throw new IllegalStateException();
                }
                case PRIMARY: {
                    return info.isPrimary();
                }
            }
            throw new IllegalStateException();
        }
        return false;
    }

    private BasicInvocationStage invokeRemotely(DataWriteCommand command, Address primaryOwner) {
        CompletableFuture<Map<Address, Response>> remoteInvocation;
        if (trace) {
            log.tracef("I'm not the primary owner, so sending the command to the primary owner(%s) in order to be forwarded", (Object)primaryOwner);
        }
        boolean isSyncForwarding = this.isSynchronous(command) || command.isReturnValueExpected();
        try {
            remoteInvocation = this.rpcManager.invokeRemotelyAsync(Collections.singletonList(primaryOwner), command, this.rpcManager.getDefaultRpcOptions(isSyncForwarding));
        }
        catch (Throwable t2) {
            command.setValueMatcher(command.getValueMatcher().matcherForRetry());
            throw t2;
        }
        if (isSyncForwarding) {
            return this.returnWithAsync((CompletableFuture<Object>)remoteInvocation.handle((responses, t) -> {
                command.setValueMatcher(command.getValueMatcher().matcherForRetry());
                CompletableFutures.rethrowException(t);
                Object primaryResult = this.getResponseFromPrimaryOwner(primaryOwner, (Map<Address, Response>)responses);
                command.updateStatusFromRemoteResponse(primaryResult);
                return primaryResult;
            }));
        }
        return this.returnWith(null);
    }

    private BasicInvocationStage primaryReturnHandler(BasicInvocationStage ignored, InvocationContext ctx, VisitableCommand visitableCommand, Object localResult) {
        boolean isSingleOwnerAndLocal;
        DataWriteCommand command = (DataWriteCommand)visitableCommand;
        if (!command.isSuccessful()) {
            if (trace) {
                log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", (Object)command);
            }
            return this.returnWith(localResult);
        }
        boolean bl = isSingleOwnerAndLocal = this.cacheConfiguration.clustering().hash().numOwners() == 1;
        if (isSingleOwnerAndLocal) {
            return this.returnWith(localResult);
        }
        ConsistentHash writeCH = this.checkTopologyId(command).getWriteConsistentHash();
        List<Address> recipients = writeCH.isReplicated() ? null : writeCH.locateOwners(command.getKey());
        ValueMatcher originalMatcher = command.getValueMatcher();
        command.setValueMatcher(ValueMatcher.MATCH_ALWAYS);
        RpcOptions rpcOptions = this.determineRpcOptionsForBackupReplication(this.rpcManager, this.isSynchronous(command), recipients);
        CompletableFuture<Map<Address, Response>> remoteInvocation = this.rpcManager.invokeRemotelyAsync(recipients, command, rpcOptions);
        return this.returnWithAsync((CompletableFuture<Object>)remoteInvocation.handle((responses, t) -> {
            command.setValueMatcher(originalMatcher.matcherForRetry());
            CompletableFutures.rethrowException(t instanceof RemoteException ? t.getCause() : t);
            return localResult;
        }));
    }

    private RpcOptions determineRpcOptionsForBackupReplication(RpcManager rpc, boolean isSync, List<Address> recipients) {
        RpcOptions options = isSync ? (recipients == null ? rpc.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS).build() : rpc.getDefaultRpcOptions(true)) : rpc.getDefaultRpcOptions(false);
        return options;
    }

    private Object getResponseFromPrimaryOwner(Address primaryOwner, Map<Address, Response> addressResponseMap) {
        Response fromPrimaryOwner = addressResponseMap.get(primaryOwner);
        if (fromPrimaryOwner == null) {
            if (trace) {
                log.tracef("Primary owner %s returned null", (Object)primaryOwner);
            }
            return null;
        }
        if (fromPrimaryOwner.isSuccessful()) {
            return ((SuccessfulResponse)fromPrimaryOwner).getResponseValue();
        }
        if (addressResponseMap.get(primaryOwner) instanceof CacheNotFoundResponse) {
            throw new OutdatedTopologyException("Cache is no longer running on primary owner " + primaryOwner);
        }
        Exception cause = fromPrimaryOwner instanceof ExceptionResponse ? ((ExceptionResponse)fromPrimaryOwner).getException() : null;
        throw new CacheException("Got unsuccessful response from primary owner: " + fromPrimaryOwner, cause);
    }

    @Override
    public BasicInvocationStage visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
        if (command.hasFlag(Flag.CACHE_MODE_LOCAL) || command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)) {
            return this.invokeNext(ctx, command);
        }
        if (!ctx.isOriginLocal()) {
            for (Object key : command.getKeys()) {
                if (ctx.lookupEntry(key) != null) continue;
                return this.handleMissingEntryOnRead(command);
            }
            return this.invokeNext(ctx, command);
        }
        CacheTopology cacheTopology = this.checkTopologyId(command);
        ConsistentHash ch = cacheTopology.getReadConsistentHash();
        Map<Address, List<Object>> requestedKeys = this.getKeysByOwner(ctx, command.getKeys(), ch, null);
        if (requestedKeys.isEmpty()) {
            return this.invokeNext(ctx, command);
        }
        GlobalTransaction gtx = ctx.isInTxScope() ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
        CompletableFutureWithCounter allFuture = new CompletableFutureWithCounter(requestedKeys.size());
        for (Map.Entry<Address, List<Object>> pair : requestedKeys.entrySet()) {
            List<Object> keys = pair.getValue();
            ClusteredGetAllCommand clusteredGetAllCommand = this.cf.buildClusteredGetAllCommand(keys, command.getFlagsBitSet(), gtx);
            this.rpcManager.invokeRemotelyAsync(Collections.singleton(pair.getKey()), clusteredGetAllCommand, this.defaultSyncOptions).whenComplete((responseMap, throwable) -> {
                Response response;
                if (throwable != null) {
                    allFuture.completeExceptionally((Throwable)throwable);
                }
                if ((response = this.getSingleSuccessfulResponseOrFail((Map<Address, Response>)responseMap, allFuture)) == null) {
                    return;
                }
                Object responseValue = ((SuccessfulResponse)response).getResponseValue();
                if (responseValue instanceof InternalCacheValue[]) {
                    int counterValue;
                    InternalCacheValue[] values = (InternalCacheValue[])responseValue;
                    CompletableFutureWithCounter completableFutureWithCounter = allFuture;
                    synchronized (completableFutureWithCounter) {
                        for (int i = 0; i < keys.size(); ++i) {
                            Object key = keys.get(i);
                            InternalCacheValue value = values[i];
                            NullCacheEntry entry = value == null ? NullCacheEntry.getInstance() : value.toInternalCacheEntry(key);
                            this.entryFactory.wrapExternalEntry(ctx, key, entry, false);
                        }
                        counterValue = --allFuture.counter;
                    }
                    if (counterValue == 0) {
                        allFuture.complete(null);
                    }
                } else {
                    allFuture.completeExceptionally(new IllegalStateException("Unexpected response value: " + responseValue));
                }
            });
        }
        return this.invokeNextAsync(ctx, command, allFuture);
    }

    @Override
    public BasicInvocationStage visitReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyCommand command) throws Throwable {
        if (command.hasFlag(Flag.CACHE_MODE_LOCAL) || command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)) {
            return this.handleLocalOnlyReadOnlyManyCommand(ctx, command);
        }
        CacheTopology cacheTopology = this.checkTopologyId(command);
        if (!ctx.isOriginLocal()) {
            return this.handleRemoteReadOnlyManyCommand(ctx, command);
        }
        if (command.getKeys().isEmpty()) {
            return this.returnWith(Stream.empty());
        }
        ConsistentHash ch = cacheTopology.getReadConsistentHash();
        int estimateForOneNode = 2 * command.getKeys().size() / ch.getMembers().size();
        ArrayList<Object> availableKeys = new ArrayList<Object>(estimateForOneNode);
        Map<Address, List<Object>> requestedKeys = this.getKeysByOwner(ctx, command.getKeys(), ch, availableKeys);
        MergingCompletableFuture<Object> allFuture = new MergingCompletableFuture<Object>(ctx, requestedKeys.size() + (availableKeys.isEmpty() ? 0 : 1), new Object[command.getKeys().size()], Arrays::stream);
        this.handleLocallyAvailableKeys(ctx, command, availableKeys, allFuture);
        int pos = availableKeys.size();
        for (Map.Entry<Address, List<Object>> addressKeys : requestedKeys.entrySet()) {
            List<Object> keys = addressKeys.getValue();
            this.remoteReadOnlyMany(addressKeys.getKey(), keys, command.getFunction(), allFuture, pos);
            pos += keys.size();
        }
        return this.returnWithAsync(allFuture);
    }

    private InvocationStage handleLocalOnlyReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyCommand command) {
        for (Object key : command.getKeys()) {
            if (ctx.lookupEntry(key) != null) continue;
            this.entryFactory.wrapExternalEntry(ctx, key, NullCacheEntry.getInstance(), false);
        }
        return this.invokeNext(ctx, command);
    }

    private BasicInvocationStage handleRemoteReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyCommand command) {
        for (Object key : command.getKeys()) {
            if (ctx.lookupEntry(key) != null) continue;
            return this.handleMissingEntryOnRead(command);
        }
        return this.invokeNext(ctx, command).thenApply((rCtx, rCommand, rv) -> ((Stream)rv).toArray());
    }

    private void remoteReadOnlyMany(Address owner, List<Object> keys, Function function, MergingCompletableFuture<Object> allFuture, int destinationIndex) {
        ReadOnlyManyCommand remoteCommand = this.cf.buildReadOnlyManyCommand(keys, function);
        this.rpcManager.invokeRemotelyAsync(Collections.singleton(owner), remoteCommand, this.defaultSyncOptions).whenComplete((responseMap, throwable) -> {
            Response response;
            if (throwable != null) {
                allFuture.completeExceptionally((Throwable)throwable);
            }
            if ((response = this.getSingleSuccessfulResponseOrFail((Map<Address, Response>)responseMap, (CompletableFuture<?>)allFuture)) == null) {
                return;
            }
            Object responseValue = ((SuccessfulResponse)response).getResponseValue();
            if (responseValue instanceof Object[]) {
                Object[] values = (Object[])responseValue;
                System.arraycopy(values, 0, allFuture.results, destinationIndex, values.length);
                allFuture.countDown();
            } else {
                allFuture.completeExceptionally(new IllegalStateException("Unexpected response value " + responseValue));
            }
        });
    }

    private void handleLocallyAvailableKeys(InvocationContext ctx, ReadOnlyManyCommand command, List<Object> availableKeys, MergingCompletableFuture<Object> allFuture) {
        if (availableKeys.isEmpty()) {
            return;
        }
        ReadOnlyManyCommand localCommand = this.cf.buildReadOnlyManyCommand(availableKeys, command.getFunction());
        this.invokeNext(ctx, localCommand).compose((stage, rCtx, rCommand, rv, throwable) -> {
            if (throwable != null) {
                allFuture.completeExceptionally(throwable);
            } else {
                try {
                    Supplier<ArrayIterator> supplier = () -> new ArrayIterator(allFuture.results);
                    BiConsumer<ArrayIterator, Object> consumer = ArrayIterator::add;
                    BiConsumer<ArrayIterator, ArrayIterator> combiner = ArrayIterator::combine;
                    ((Stream)rv).collect(supplier, consumer, combiner);
                    allFuture.countDown();
                }
                catch (Throwable t) {
                    allFuture.completeExceptionally(t);
                }
            }
            return this.returnWithAsync(allFuture);
        });
    }

    private Map<Address, List<Object>> getKeysByOwner(InvocationContext ctx, Collection<?> keys, ConsistentHash ch, List<Object> availableKeys) {
        HashMap<Address, List<Object>> requestedKeys = new HashMap<Address, List<Object>>(ch.getMembers().size());
        int estimateForOneNode = 2 * keys.size() / ch.getMembers().size();
        for (Object key : keys) {
            CacheEntry entry = ctx.lookupEntry(key);
            if (entry == null) {
                List<Address> owners = ch.locateOwners(key);
                boolean foundExisting = false;
                for (Address address : owners) {
                    if (address.equals(this.rpcManager.getAddress())) {
                        throw new IllegalStateException("Entry should be always wrapped!");
                    }
                    List list = (List)requestedKeys.get(address);
                    if (list == null) continue;
                    list.add(key);
                    foundExisting = true;
                    break;
                }
                if (foundExisting) continue;
                ArrayList list = new ArrayList(estimateForOneNode);
                list.add(key);
                requestedKeys.put(owners.get(0), list);
                continue;
            }
            if (availableKeys == null) continue;
            availableKeys.add(key);
        }
        return requestedKeys;
    }

    protected Response getSingleSuccessfulResponseOrFail(Map<Address, Response> responseMap, CompletableFuture<?> future) {
        Iterator<Response> it = responseMap.values().iterator();
        if (!it.hasNext()) {
            future.completeExceptionally(new RpcException("Expected one response"));
            return null;
        }
        Response response = it.next();
        if (it.hasNext()) {
            future.completeExceptionally(new IllegalStateException("Too many responses " + responseMap));
            return null;
        }
        if (!response.isSuccessful()) {
            future.completeExceptionally(new OutdatedTopologyException("Remote node has higher topology, response " + response));
            return null;
        }
        return response;
    }

    private BasicInvocationStage visitGetCommand(InvocationContext ctx, AbstractDataCommand command) throws Throwable {
        Object key = command.getKey();
        CacheEntry entry = ctx.lookupEntry(key);
        if (entry == null) {
            if (ctx.isOriginLocal()) {
                if (this.readNeedsRemoteValue(ctx, command)) {
                    return this.invokeNextAsync(ctx, command, this.remoteGet(ctx, command, command.getKey(), false));
                }
                return this.returnWith(null);
            }
            return this.handleMissingEntryOnRead(command);
        }
        return this.invokeNext(ctx, command);
    }

    private BasicInvocationStage handleMissingEntryOnRead(TopologyAffectedCommand command) {
        CacheTopology cacheTopology = this.stateTransferManager.getCacheTopology();
        int currentTopologyId = cacheTopology.getTopologyId();
        int cmdTopology = command.getTopologyId();
        if (cmdTopology < currentTopologyId) {
            return this.returnWith(UnsuccessfulResponse.INSTANCE);
        }
        throw new OutdatedTopologyException(cmdTopology);
    }

    @Override
    public BasicInvocationStage visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
        return this.visitGetCommand(ctx, command);
    }

    @Override
    public BasicInvocationStage visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
        return this.visitGetCommand(ctx, command);
    }

    @Override
    public BasicInvocationStage visitReadOnlyKeyCommand(InvocationContext ctx, ReadOnlyKeyCommand command) throws Throwable {
        Object key = command.getKey();
        if (ctx.lookupEntry(key) != null) {
            return this.invokeNext(ctx, command);
        }
        if (!ctx.isOriginLocal()) {
            return this.handleMissingEntryOnRead(command);
        }
        if (this.readNeedsRemoteValue(ctx, command)) {
            CacheTopology cacheTopology = this.checkTopologyId(command);
            List<Address> owners = cacheTopology.getReadConsistentHash().locateOwners(key);
            if (trace) {
                log.tracef("Doing a remote get for key %s in topology %d to %s", key, (Object)cacheTopology.getTopologyId(), (Object)owners);
            }
            command.setTopologyId(cacheTopology.getTopologyId());
            CompletableFuture<Map<Address, Response>> rpc = this.rpcManager.invokeRemotelyAsync(owners, command, this.staggeredOptions);
            return this.returnWithAsync((CompletableFuture<Object>)rpc.thenApply(responseMap -> {
                for (Response rsp : responseMap.values()) {
                    if (!rsp.isSuccessful()) continue;
                    return ((SuccessfulResponse)rsp).getResponseValue();
                }
                throw new OutdatedTopologyException("We haven't found an owner");
            }));
        }
        this.entryFactory.wrapExternalEntry(ctx, key, NullCacheEntry.getInstance(), false);
        return this.invokeNext(ctx, command);
    }

    protected CacheTopology checkTopologyId(TopologyAffectedCommand command) {
        int cmdTopology;
        CacheTopology cacheTopology = this.stateTransferManager.getCacheTopology();
        int currentTopologyId = cacheTopology.getTopologyId();
        if (currentTopologyId != (cmdTopology = command.getTopologyId()) && cmdTopology != -1) {
            throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " + cmdTopology + ", got " + currentTopologyId);
        }
        if (trace) {
            log.tracef("Current topology %d, command topology %d", currentTopologyId, cmdTopology);
        }
        return cacheTopology;
    }

    protected boolean readNeedsRemoteValue(InvocationContext ctx, AbstractDataCommand command) {
        return ctx.isOriginLocal() && !command.hasFlag(Flag.CACHE_MODE_LOCAL) && !command.hasFlag(Flag.SKIP_REMOTE_LOOKUP);
    }

    protected static class MergingCompletableFuture<T>
    extends CountDownCompletableFuture {
        private final Function<T[], Object> transform;
        protected final T[] results;

        public MergingCompletableFuture(InvocationContext ctx, int participants, T[] results, Function<T[], Object> transform) {
            super(ctx, participants);
            this.results = results;
            this.transform = transform;
        }

        @Override
        protected Object result() {
            return this.transform == null || this.results == null ? null : this.transform.apply(this.results);
        }
    }

    protected static class CountDownCompletableFuture
    extends CompletableFuture<Object> {
        protected final InvocationContext ctx;
        protected final AtomicInteger counter;

        public CountDownCompletableFuture(InvocationContext ctx, int participants) {
            if (trace) {
                log.tracef("Creating shortcut countdown with %d participants", participants);
            }
            this.ctx = ctx;
            this.counter = new AtomicInteger(participants);
        }

        public void countDown() {
            if (this.counter.decrementAndGet() == 0) {
                this.complete(this.result());
            }
        }

        public void increment() {
            int preValue = this.counter.getAndIncrement();
            if (preValue == 0) {
                throw new IllegalStateException();
            }
        }

        protected Object result() {
            return null;
        }
    }

    private static class CompletableFutureWithCounter
    extends CompletableFuture<Void> {
        private int counter;

        public CompletableFutureWithCounter(int counter) {
            this.counter = counter;
        }
    }

    private static class ArrayIterator {
        private final Object[] array;
        private int pos = 0;

        public ArrayIterator(Object[] array) {
            this.array = array;
        }

        public void add(Object item) {
            this.array[this.pos] = item;
            ++this.pos;
        }

        public void combine(ArrayIterator other) {
            throw new UnsupportedOperationException("The stream is not supposed to be parallel");
        }
    }
}

