/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.concurrent.ConcurrentHashSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.executors.SemaphoreCompletionService;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.filter.KeyFilter;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.statetransfer.CommitManager;
import org.infinispan.statetransfer.InboundTransferTask;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.statetransfer.TransactionInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.impl.RemoteTransaction;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.totalorder.TotalOrderLatch;
import org.infinispan.transaction.totalorder.TotalOrderManager;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.BlockingTaskAwareExecutorService;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class StateConsumerImpl
implements StateConsumer {
    private static final Log log = LogFactory.getLog(StateConsumerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final int NO_REBALANCE_IN_PROGRESS = -1;
    private static final long STATE_TRANSFER_FLAGS = EnumUtil.bitSetOf(Flag.PUT_FOR_STATE_TRANSFER, Flag.CACHE_MODE_LOCAL, Flag.IGNORE_RETURN_VALUES, Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_SHARED_CACHE_STORE, Flag.SKIP_OWNERSHIP_CHECK, Flag.SKIP_XSITE_BACKUP);
    private Cache cache;
    private StateTransferManager stateTransferManager;
    private String cacheName;
    private Configuration configuration;
    private RpcManager rpcManager;
    private TransactionManager transactionManager;
    private CommandsFactory commandsFactory;
    private TransactionTable transactionTable;
    private DataContainer<Object, Object> dataContainer;
    private PersistenceManager persistenceManager;
    private InterceptorChain interceptorChain;
    private InvocationContextFactory icf;
    private StateTransferLock stateTransferLock;
    private CacheNotifier cacheNotifier;
    private TotalOrderManager totalOrderManager;
    private BlockingTaskAwareExecutorService remoteCommandsExecutor;
    private long timeout;
    private boolean isFetchEnabled;
    private boolean isTransactional;
    private boolean isInvalidationMode;
    private boolean isTotalOrder;
    private volatile KeyInvalidationListener keyInvalidationListener;
    private CommitManager commitManager;
    private ExecutorService stateTransferExecutor;
    private volatile CacheTopology cacheTopology;
    private final AtomicInteger stateTransferTopologyId = new AtomicInteger(-1);
    private final AtomicBoolean waitingForState = new AtomicBoolean(false);
    private final Object transferMapsLock = new Object();
    @GuardedBy(value="transferMapsLock")
    private final Map<Address, List<InboundTransferTask>> transfersBySource = new HashMap<Address, List<InboundTransferTask>>();
    @GuardedBy(value="transferMapsLock")
    private final Map<Integer, InboundTransferTask> transfersBySegment = new HashMap<Integer, InboundTransferTask>();
    private SemaphoreCompletionService<Void> stateRequestCompletionService;
    private volatile boolean ownsData = false;
    private RpcOptions rpcOptions;

    @Override
    public void stopApplyingState() {
        if (trace) {
            log.tracef("Stop keeping track of changed keys for state transfer", new Object[0]);
        }
        this.commitManager.stopTrack(Flag.PUT_FOR_STATE_TRANSFER);
    }

    @Inject
    public void init(Cache cache, @ComponentName(value="org.infinispan.executors.stateTransferExecutor") ExecutorService stateTransferExecutor, StateTransferManager stateTransferManager, InterceptorChain interceptorChain, InvocationContextFactory icf, Configuration configuration, RpcManager rpcManager, TransactionManager transactionManager, CommandsFactory commandsFactory, PersistenceManager persistenceManager, DataContainer<Object, Object> dataContainer, TransactionTable transactionTable, StateTransferLock stateTransferLock, CacheNotifier cacheNotifier, TotalOrderManager totalOrderManager, @ComponentName(value="org.infinispan.executors.remote") BlockingTaskAwareExecutorService remoteCommandsExecutor, CommitManager commitManager) {
        this.cache = cache;
        this.cacheName = cache.getName();
        this.stateTransferExecutor = stateTransferExecutor;
        this.stateTransferManager = stateTransferManager;
        this.interceptorChain = interceptorChain;
        this.icf = icf;
        this.configuration = configuration;
        this.rpcManager = rpcManager;
        this.transactionManager = transactionManager;
        this.commandsFactory = commandsFactory;
        this.persistenceManager = persistenceManager;
        this.dataContainer = dataContainer;
        this.transactionTable = transactionTable;
        this.stateTransferLock = stateTransferLock;
        this.cacheNotifier = cacheNotifier;
        this.totalOrderManager = totalOrderManager;
        this.remoteCommandsExecutor = remoteCommandsExecutor;
        this.commitManager = commitManager;
        this.isInvalidationMode = configuration.clustering().cacheMode().isInvalidation();
        this.isTransactional = configuration.transaction().transactionMode().isTransactional();
        this.isTotalOrder = configuration.transaction().transactionProtocol().isTotalOrder();
        this.timeout = configuration.clustering().stateTransfer().timeout();
        this.stateRequestCompletionService = new SemaphoreCompletionService(stateTransferExecutor, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasActiveTransfers() {
        Object object = this.transferMapsLock;
        synchronized (object) {
            return !this.transfersBySource.isEmpty();
        }
    }

    @Override
    public boolean isStateTransferInProgress() {
        return this.stateTransferTopologyId.get() != -1;
    }

    @Override
    public boolean isStateTransferInProgressForKey(Object key) {
        if (this.isInvalidationMode) {
            return false;
        }
        CacheTopology localCacheTopology = this.cacheTopology;
        if (localCacheTopology == null || localCacheTopology.getPendingCH() == null) {
            return false;
        }
        Address address = this.rpcManager.getAddress();
        boolean keyWillBeLocal = localCacheTopology.getPendingCH().isKeyLocalToNode(address, key);
        boolean keyIsLocal = localCacheTopology.getCurrentCH().isKeyLocalToNode(address, key);
        return keyWillBeLocal && !keyIsLocal;
    }

    @Override
    public boolean ownsData() {
        return this.ownsData;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onTopologyUpdate(CacheTopology cacheTopology, boolean isRebalance) {
        boolean wasMember;
        boolean isMember = cacheTopology.getMembers().contains(this.rpcManager.getAddress());
        if (trace) {
            log.tracef("Received new topology for cache %s, isRebalance = %b, isMember = %b, topology = %s", this.cacheName, isRebalance, isMember, cacheTopology);
        }
        if (!this.ownsData && isMember) {
            this.ownsData = true;
        } else if (this.ownsData && !isMember) {
            this.ownsData = false;
        }
        boolean startRebalance = isRebalance;
        if (!isRebalance && cacheTopology.getPendingCH() != null && this.cacheTopology.getPendingCH() == null) {
            if (trace) {
                log.tracef("Forcing startRebalance = true", new Object[0]);
            }
            startRebalance = true;
        }
        if (startRebalance) {
            this.stateTransferTopologyId.compareAndSet(-1, cacheTopology.getTopologyId());
            this.cacheNotifier.notifyDataRehashed(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH(), cacheTopology.getUnionCH(), cacheTopology.getTopologyId(), true);
        }
        this.awaitTotalOrderTransactions(cacheTopology, startRebalance);
        this.waitingForState.set(false);
        ConsistentHash newWriteCh = cacheTopology.getWriteConsistentHash();
        ConsistentHash previousReadCh = this.cacheTopology != null ? this.cacheTopology.getReadConsistentHash() : null;
        ConsistentHash previousWriteCh = this.cacheTopology != null ? this.cacheTopology.getWriteConsistentHash() : null;
        this.stateTransferLock.acquireExclusiveTopologyLock();
        this.cacheTopology = cacheTopology;
        if (startRebalance) {
            if (trace) {
                log.tracef("Start keeping track of keys for rebalance", new Object[0]);
            }
            this.commitManager.stopTrack(Flag.PUT_FOR_STATE_TRANSFER);
            this.commitManager.startTrack(Flag.PUT_FOR_STATE_TRANSFER);
        }
        this.stateTransferLock.releaseExclusiveTopologyLock();
        this.stateTransferLock.notifyTopologyInstalled(cacheTopology.getTopologyId());
        this.remoteCommandsExecutor.checkForReadyTasks();
        try {
            boolean changed;
            if (this.isTransactional || this.isFetchEnabled) {
                Set<Integer> addedSegments;
                if (previousWriteCh == null) {
                    addedSegments = this.getOwnedSegments(newWriteCh);
                    if (this.configuration.clustering().cacheMode().isDistributed()) {
                        Collection<DistributedCallable> callables = this.getClusterListeners(cacheTopology);
                        for (DistributedCallable callable : callables) {
                            callable.setEnvironment(this.cache, null);
                            try {
                                callable.call();
                            }
                            catch (Exception e) {
                                log.clusterListenerInstallationFailure(e);
                            }
                        }
                    }
                    if (trace) {
                        log.tracef("On cache %s we have: added segments: %s", (Object)this.cacheName, (Object)addedSegments);
                    }
                } else {
                    Set<Integer> removedSegments;
                    Set<Integer> previousSegments = this.getOwnedSegments(previousWriteCh);
                    Set<Integer> newSegments = this.getOwnedSegments(newWriteCh);
                    if (newSegments.size() == newWriteCh.getNumSegments()) {
                        removedSegments = Collections.emptySet();
                    } else {
                        removedSegments = new HashSet<Integer>(previousSegments);
                        removedSegments.removeAll(newSegments);
                    }
                    addedSegments = new HashSet<Integer>(newSegments);
                    addedSegments.removeAll(previousSegments);
                    if (trace) {
                        log.tracef("On cache %s we have: new segments: %s; old segments: %s", (Object)this.cacheName, (Object)newSegments, (Object)previousSegments);
                        log.tracef("On cache %s we have: added segments: %s; removed segments: %s", (Object)this.cacheName, (Object)addedSegments, (Object)removedSegments);
                    }
                    this.cancelTransfers(removedSegments);
                    if (!startRebalance && !addedSegments.isEmpty()) {
                        log.debugf("Not requesting segments %s because the last owner left the cluster", (Object)addedSegments);
                        addedSegments.clear();
                    }
                    this.restartBrokenTransfers(cacheTopology, addedSegments);
                }
                if (!addedSegments.isEmpty()) {
                    this.addTransfers(addedSegments);
                }
            }
            int rebalanceTopologyId = this.stateTransferTopologyId.get();
            if (trace) {
                log.tracef("Topology update processed, stateTransferTopologyId = %d, startRebalance = %s, pending CH = %s", (Object)rebalanceTopologyId, (Object)startRebalance, (Object)cacheTopology.getPendingCH());
            }
            if (rebalanceTopologyId != -1 && !startRebalance && cacheTopology.getPendingCH() == null && (changed = this.stateTransferTopologyId.compareAndSet(rebalanceTopologyId, -1))) {
                this.stopApplyingState();
                this.cacheNotifier.notifyDataRehashed(previousReadCh, cacheTopology.getCurrentCH(), previousWriteCh, cacheTopology.getTopologyId(), false);
                if (trace) {
                    log.tracef("Unlock State Transfer in Progress for topology ID %s", cacheTopology.getTopologyId());
                }
                if (this.isTotalOrder) {
                    this.totalOrderManager.notifyStateTransferEnd();
                }
            }
            this.stateTransferLock.notifyTransactionDataReceived(cacheTopology.getTopologyId());
            this.remoteCommandsExecutor.checkForReadyTasks();
            if (this.stateTransferTopologyId.get() != -1 && isMember) {
                this.waitingForState.set(true);
            }
            this.notifyEndOfRebalanceIfNeeded(cacheTopology.getTopologyId(), cacheTopology.getRebalanceId());
            if (this.transactionTable != null) {
                this.transactionTable.cleanupLeaverTransactions(this.rpcManager.getTransport().getMembers());
            }
            boolean bl = wasMember = previousWriteCh != null && previousWriteCh.getMembers().contains(this.rpcManager.getAddress());
        }
        catch (Throwable throwable) {
            boolean wasMember2;
            this.stateTransferLock.notifyTransactionDataReceived(cacheTopology.getTopologyId());
            this.remoteCommandsExecutor.checkForReadyTasks();
            if (this.stateTransferTopologyId.get() != -1 && isMember) {
                this.waitingForState.set(true);
            }
            this.notifyEndOfRebalanceIfNeeded(cacheTopology.getTopologyId(), cacheTopology.getRebalanceId());
            if (this.transactionTable != null) {
                this.transactionTable.cleanupLeaverTransactions(this.rpcManager.getTransport().getMembers());
            }
            boolean bl = wasMember2 = previousWriteCh != null && previousWriteCh.getMembers().contains(this.rpcManager.getAddress());
            if (isMember || wasMember2) {
                HashSet<Integer> removedSegments = new HashSet<Integer>(newWriteCh.getNumSegments());
                for (int i = 0; i < newWriteCh.getNumSegments(); ++i) {
                    removedSegments.add(i);
                }
                Set<Integer> newSegments = this.getOwnedSegments(newWriteCh);
                removedSegments.removeAll(newSegments);
                try {
                    this.removeStaleData(removedSegments);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new CacheException(e);
                }
            }
            throw throwable;
        }
        if (isMember || wasMember) {
            HashSet<Integer> removedSegments = new HashSet<Integer>(newWriteCh.getNumSegments());
            for (int i = 0; i < newWriteCh.getNumSegments(); ++i) {
                removedSegments.add(i);
            }
            Set<Integer> newSegments = this.getOwnedSegments(newWriteCh);
            removedSegments.removeAll(newSegments);
            try {
                this.removeStaleData(removedSegments);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CacheException(e);
            }
        }
    }

    private void awaitTotalOrderTransactions(CacheTopology cacheTopology, boolean isRebalance) {
        if (this.isTotalOrder) {
            if (trace) {
                log.trace("State Transfer in Total Order cache. Waiting for remote transactions to finish");
            }
            try {
                for (TotalOrderLatch block : this.totalOrderManager.notifyStateTransferStart(cacheTopology.getTopologyId(), isRebalance)) {
                    block.awaitUntilUnBlock();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CacheException(e);
            }
            if (trace) {
                log.trace("State Transfer in Total Order cache. All remote transactions are finished. Moving on...");
            }
        }
    }

    private void notifyEndOfRebalanceIfNeeded(int topologyId, int rebalanceId) {
        if (this.waitingForState.get() && !this.hasActiveTransfers() && this.waitingForState.compareAndSet(true, false)) {
            log.debugf("Finished receiving of segments for cache %s for topology %d.", (Object)this.cacheName, (Object)topologyId);
            this.stopApplyingState();
            this.stateTransferManager.notifyEndOfRebalance(topologyId, rebalanceId);
            List<Future<Void>> futures = this.stateRequestCompletionService.drainCompletionQueue();
            boolean interrupted = false;
            for (Future<Void> future : futures) {
                try {
                    future.get();
                }
                catch (InterruptedException e) {
                    interrupted = true;
                }
                catch (ExecutionException e) {
                    log.topologyUpdateError(topologyId, e.getCause());
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private Set<Integer> getOwnedSegments(ConsistentHash consistentHash) {
        Address address = this.rpcManager.getAddress();
        return consistentHash.getMembers().contains(address) ? consistentHash.getSegmentsForOwner(address) : Collections.emptySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void applyState(final Address sender, int topologyId, Collection<StateChunk> stateChunks) {
        ConsistentHash wCh = this.cacheTopology.getWriteConsistentHash();
        if (!wCh.getMembers().contains(this.rpcManager.getAddress())) {
            if (trace) {
                log.tracef("Ignoring received state because we are no longer a member of cache %s", (Object)this.cacheName);
            }
            return;
        }
        int rebalanceTopologyId = this.stateTransferTopologyId.get();
        if (rebalanceTopologyId == -1) {
            log.debugf("Discarding state response with topology id %d for cache %s, we don't have a state transfer in progress", topologyId, (Object)this.cacheName);
            return;
        }
        if (topologyId < rebalanceTopologyId) {
            log.debugf("Discarding state response with old topology id %d for cache %s, state transfer request topology was %b", topologyId, (Object)this.cacheName, (Object)this.waitingForState);
            return;
        }
        if (trace) {
            log.tracef("Before applying the received state the data container of cache %s has %d keys", (Object)this.cacheName, (Object)this.dataContainer.size());
        }
        final Set<Integer> mySegments = wCh.getSegmentsForOwner(this.rpcManager.getAddress());
        final CountDownLatch countDownLatch = new CountDownLatch(stateChunks.size());
        for (final StateChunk stateChunk : stateChunks) {
            this.stateTransferExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    StateConsumerImpl.this.applyChunk(sender, mySegments, stateChunk);
                    countDownLatch.countDown();
                }
            });
        }
        try {
            boolean await = countDownLatch.await(this.timeout, TimeUnit.MILLISECONDS);
            if (!await) {
                throw new TimeoutException("Timed out applying state");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new CacheException(e);
        }
        if (trace) {
            log.tracef("After applying the received state the data container of cache %s has %d keys", (Object)this.cacheName, (Object)this.dataContainer.size());
            Object object = this.transferMapsLock;
            synchronized (object) {
                log.tracef("Segments not received yet for cache %s: %s", (Object)this.cacheName, (Object)this.transfersBySource);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void applyChunk(Address sender, Set<Integer> mySegments, StateChunk stateChunk) {
        InboundTransferTask inboundTransfer;
        if (!mySegments.contains(stateChunk.getSegmentId())) {
            log.warnf("Discarding received cache entries for segment %d of cache %s because they do not belong to this node.", (Object)stateChunk.getSegmentId(), (Object)this.cacheName);
            return;
        }
        Object object = this.transferMapsLock;
        synchronized (object) {
            inboundTransfer = this.transfersBySegment.get(stateChunk.getSegmentId());
        }
        if (inboundTransfer != null) {
            if (stateChunk.getCacheEntries() != null) {
                this.doApplyState(sender, stateChunk.getSegmentId(), stateChunk.getCacheEntries());
            }
            inboundTransfer.onStateReceived(stateChunk.getSegmentId(), stateChunk.isLastChunk());
        } else if (this.cache.getStatus().allowInvocations()) {
            log.ignoringUnsolicitedState(sender, stateChunk.getSegmentId(), this.cacheName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doApplyState(Address sender, int segmentId, Collection<InternalCacheEntry> cacheEntries) {
        if (trace) {
            log.tracef("Applying new state chunk for segment %d of cache %s from node %s: received %d cache entries", segmentId, this.cacheName, sender, cacheEntries.size());
        }
        boolean transactional = this.transactionManager != null;
        for (InternalCacheEntry e : cacheEntries) {
            try {
                InvocationContext ctx;
                if (transactional) {
                    this.transactionManager.begin();
                    ctx = this.icf.createInvocationContext(this.transactionManager.getTransaction(), true);
                    ((AbstractCacheTransaction)((TxInvocationContext)ctx).getCacheTransaction()).setStateTransferFlag(Flag.PUT_FOR_STATE_TRANSFER);
                } else {
                    ctx = this.icf.createSingleKeyNonTxInvocationContext();
                }
                PutKeyValueCommand put = this.commandsFactory.buildPutKeyValueCommand(e.getKey(), e.getValue(), e.getMetadata(), STATE_TRANSFER_FLAGS);
                ctx.setLockOwner(put.getKeyLockOwner());
                this.interceptorChain.invoke(ctx, put);
                if (this.transactionManager == null) continue;
                this.transactionManager.commit();
            }
            catch (Exception ex) {
                if (!this.cache.getStatus().allowInvocations()) {
                    log.debugf("Cache %s is shutting down, stopping state transfer", (Object)this.cacheName);
                    break;
                }
                log.problemApplyingStateForKey(ex.getMessage(), e.getKey(), ex);
            }
            finally {
                try {
                    if (!transactional || this.transactionManager.getTransaction() == null) continue;
                    this.transactionManager.rollback();
                }
                catch (SystemException systemException) {}
            }
        }
        if (trace) {
            log.tracef("Finished applying chunk of segment %d of cache %s", segmentId, (Object)this.cacheName);
        }
    }

    private void applyTransactions(Address sender, Collection<TransactionInfo> transactions, int topologyId) {
        log.debugf("Applying %d transactions for cache %s transferred from node %s", transactions.size(), (Object)this.cacheName, (Object)sender);
        if (this.isTransactional) {
            for (TransactionInfo transactionInfo : transactions) {
                GlobalTransaction gtx = transactionInfo.getGlobalTransaction();
                if (this.rpcManager.getAddress().equals(gtx.getAddress())) continue;
                gtx.setRemote(true);
                AbstractCacheTransaction tx = this.transactionTable.getLocalTransaction(gtx);
                if (tx == null && (tx = this.transactionTable.getRemoteTransaction(gtx)) == null) {
                    tx = this.transactionTable.getOrCreateRemoteTransaction(gtx, transactionInfo.getModifications());
                    ((RemoteTransaction)tx).setLookedUpEntriesTopology(topologyId - 1);
                }
                transactionInfo.getLockedKeys().forEach(tx::addBackupLockForKey);
            }
        }
    }

    @Start(priority=20)
    public void start() {
        CacheMode mode = this.configuration.clustering().cacheMode();
        this.isFetchEnabled = !(!mode.isDistributed() && !mode.isReplicated() || !this.configuration.clustering().stateTransfer().fetchInMemoryState() && this.configuration.persistence().fetchPersistentState() == false);
        this.rpcOptions = this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS).timeout(this.timeout, TimeUnit.MILLISECONDS).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Stop(priority=0)
    public void stop() {
        if (trace) {
            log.tracef("Shutting down StateConsumer of cache %s on node %s", (Object)this.cacheName, (Object)this.rpcManager.getAddress());
        }
        try {
            Object object = this.transferMapsLock;
            synchronized (object) {
                this.stateRequestCompletionService.cancelQueuedTasks();
                this.stateRequestCompletionService.drainCompletionQueue();
                Iterator<List<InboundTransferTask>> it = this.transfersBySource.values().iterator();
                while (it.hasNext()) {
                    List<InboundTransferTask> inboundTransfers = it.next();
                    it.remove();
                    for (InboundTransferTask inboundTransfer : inboundTransfers) {
                        inboundTransfer.cancel();
                    }
                }
                this.transfersBySource.clear();
                this.transfersBySegment.clear();
            }
        }
        catch (Throwable t) {
            log.errorf(t, "Failed to stop StateConsumer of cache %s on node %s", (Object)this.cacheName, (Object)this.rpcManager.getAddress());
        }
    }

    @Override
    public CacheTopology getCacheTopology() {
        return this.cacheTopology;
    }

    public void setKeyInvalidationListener(KeyInvalidationListener keyInvalidationListener) {
        this.keyInvalidationListener = keyInvalidationListener;
    }

    private void addTransfers(Set<Integer> segments) {
        log.debugf("Adding inbound state transfer for segments %s of cache %s", (Object)segments, (Object)this.cacheName);
        HashSet<Address> excludedSources = new HashSet<Address>();
        HashMap<Address, Set<Integer>> sources = new HashMap<Address, Set<Integer>>();
        if (this.isTransactional && !this.isTotalOrder) {
            this.requestTransactions(segments, sources, excludedSources);
        }
        if (this.isFetchEnabled) {
            this.requestSegments(segments, sources, excludedSources);
        }
        if (trace) {
            log.tracef("Finished adding inbound state transfer for segments %s of cache %s", (Object)segments, (Object)this.cacheName);
        }
    }

    private void findSources(Set<Integer> segments, Map<Address, Set<Integer>> sources, Set<Address> excludedSources) {
        for (Integer segmentId : segments) {
            Address source = this.findSource(segmentId, excludedSources);
            if (source == null) continue;
            Set<Integer> segmentsFromSource = sources.get(source);
            if (segmentsFromSource == null) {
                segmentsFromSource = new HashSet<Integer>();
                sources.put(source, segmentsFromSource);
            }
            segmentsFromSource.add(segmentId);
        }
    }

    private Address findSource(int segmentId, Set<Address> excludedSources) {
        List<Address> owners = this.cacheTopology.getReadConsistentHash().locateOwnersForSegment(segmentId);
        if (!owners.contains(this.rpcManager.getAddress())) {
            for (int i = 0; i < owners.size(); ++i) {
                Address o = owners.get(i);
                if (o.equals(this.rpcManager.getAddress()) || excludedSources.contains(o)) continue;
                return o;
            }
            log.noLiveOwnersFoundForSegment(segmentId, this.cacheName, owners, excludedSources);
        }
        return null;
    }

    private void requestTransactions(Set<Integer> segments, Map<Address, Set<Integer>> sources, Set<Address> excludedSources) {
        this.findSources(segments, sources, excludedSources);
        boolean seenFailures = false;
        while (true) {
            HashSet<Integer> failedSegments = new HashSet<Integer>();
            int topologyId = this.cacheTopology.getTopologyId();
            for (Map.Entry<Address, Set<Integer>> sourceEntry : sources.entrySet()) {
                Address source = sourceEntry.getKey();
                Set<Integer> segmentsFromSource = sourceEntry.getValue();
                boolean failed = false;
                boolean exclude = false;
                try {
                    Response response = this.getTransactions(source, segmentsFromSource, topologyId);
                    if (response instanceof SuccessfulResponse) {
                        List transactions = (List)((SuccessfulResponse)response).getResponseValue();
                        this.applyTransactions(source, transactions, topologyId);
                    } else if (response instanceof CacheNotFoundResponse) {
                        log.debugf("Cache %s was stopped on node %s before sending transaction information", (Object)this.cacheName, (Object)source);
                        failed = true;
                        exclude = true;
                    } else {
                        log.unsuccessfulResponseRetrievingTransactionsForSegments(source, response);
                        failed = true;
                    }
                }
                catch (SuspectException e) {
                    log.debugf("Node %s left the cluster before sending transaction information", (Object)source);
                    failed = true;
                    exclude = true;
                }
                catch (Exception e) {
                    if (!this.cache.getStatus().isTerminated()) {
                        log.failedToRetrieveTransactionsForSegments(segments, this.cacheName, source, e);
                    }
                    failed = true;
                }
                if (failed) {
                    failedSegments.addAll(segmentsFromSource);
                }
                if (!exclude) continue;
                excludedSources.add(source);
            }
            if (failedSegments.isEmpty()) break;
            seenFailures = true;
            sources.clear();
            this.findSources(failedSegments, sources, excludedSources);
        }
        if (seenFailures) {
            sources.clear();
        }
    }

    private Collection<DistributedCallable> getClusterListeners(CacheTopology topology) {
        for (Address source : topology.getMembers()) {
            if (source.equals(this.rpcManager.getAddress())) continue;
            if (trace) {
                log.tracef("Requesting cluster listeners of cache %s from node %s", (Object)this.cacheName, (Object)source);
            }
            try {
                StateRequestCommand cmd = this.commandsFactory.buildStateRequestCommand(StateRequestCommand.Type.GET_CACHE_LISTENERS, this.rpcManager.getAddress(), topology.getTopologyId(), null);
                Map<Address, Response> responses = this.rpcManager.invokeRemotely(Collections.singleton(source), cmd, this.rpcOptions);
                Response response = responses.get(source);
                if (response instanceof SuccessfulResponse) {
                    return (Collection)((SuccessfulResponse)response).getResponseValue();
                }
                log.unsuccessfulResponseForClusterListeners(source, response);
            }
            catch (CacheException e) {
                log.exceptionDuringClusterListenerRetrieval(source, e);
            }
        }
        if (trace) {
            log.trace("Unable to acquire cluster listeners from other members, assuming none are present");
        }
        return Collections.emptySet();
    }

    private Response getTransactions(Address source, Set<Integer> segments, int topologyId) {
        if (trace) {
            log.tracef("Requesting transactions for segments %s of cache %s from node %s", (Object)segments, (Object)this.cacheName, (Object)source);
        }
        StateRequestCommand cmd = this.commandsFactory.buildStateRequestCommand(StateRequestCommand.Type.GET_TRANSACTIONS, this.rpcManager.getAddress(), topologyId, segments);
        Map<Address, Response> responses = this.rpcManager.invokeRemotely(Collections.singleton(source), cmd, this.rpcOptions);
        return responses.get(source);
    }

    private void requestSegments(Set<Integer> segments, Map<Address, Set<Integer>> sources, Set<Address> excludedSources) {
        if (sources.isEmpty()) {
            this.findSources(segments, sources, excludedSources);
        }
        for (Map.Entry<Address, Set<Integer>> e : sources.entrySet()) {
            this.addTransfer(e.getKey(), e.getValue());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void retryTransferTask(InboundTransferTask task) {
        if (trace) {
            log.tracef("Retrying failed task: %s", (Object)task);
        }
        task.cancel();
        Object object = this.transferMapsLock;
        synchronized (object) {
            HashSet<Integer> failedSegments = new HashSet<Integer>();
            HashSet<Address> excludedSources = new HashSet<Address>();
            if (this.removeTransfer(task)) {
                excludedSources.add(task.getSource());
                failedSegments.addAll(task.getSegments());
            }
            failedSegments.retainAll(this.getOwnedSegments(this.cacheTopology.getWriteConsistentHash()));
            HashMap<Address, Set<Integer>> sources = new HashMap<Address, Set<Integer>>();
            this.findSources(failedSegments, sources, excludedSources);
            for (Map.Entry e : sources.entrySet()) {
                this.addTransfer((Address)e.getKey(), (Set)e.getValue());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelTransfers(Set<Integer> removedSegments) {
        Object object = this.transferMapsLock;
        synchronized (object) {
            ArrayList<Integer> segmentsToCancel = new ArrayList<Integer>(removedSegments);
            while (!segmentsToCancel.isEmpty()) {
                int segmentId = (Integer)segmentsToCancel.remove(0);
                InboundTransferTask inboundTransfer = this.transfersBySegment.get(segmentId);
                if (inboundTransfer == null) continue;
                HashSet<Integer> cancelledSegments = new HashSet<Integer>(removedSegments);
                cancelledSegments.retainAll(inboundTransfer.getSegments());
                segmentsToCancel.removeAll(cancelledSegments);
                this.transfersBySegment.keySet().removeAll(cancelledSegments);
                inboundTransfer.cancelSegments(cancelledSegments);
                if (!inboundTransfer.isCancelled()) continue;
                this.removeTransfer(inboundTransfer);
            }
        }
    }

    private void removeStaleData(final Set<Integer> removedSegments) throws InterruptedException {
        log.debugf("Removing no longer owned entries for cache %s", (Object)this.cacheName);
        if (this.keyInvalidationListener != null) {
            this.keyInvalidationListener.beforeInvalidation(removedSegments, Collections.emptySet());
        }
        if (removedSegments.isEmpty()) {
            return;
        }
        final ConcurrentHashSet keysToRemove = new ConcurrentHashSet();
        this.dataContainer.executeTask(KeyFilter.ACCEPT_ALL_FILTER, (o, ice) -> {
            Object key = ice.getKey();
            int keySegment = this.getSegment(key);
            if (removedSegments.contains(keySegment)) {
                keysToRemove.add(key);
            }
        });
        if (!removedSegments.isEmpty()) {
            try {
                KeyFilter filter = new KeyFilter(){

                    public boolean accept(Object key) {
                        if (StateConsumerImpl.this.dataContainer.containsKey(key)) {
                            return false;
                        }
                        int keySegment = StateConsumerImpl.this.getSegment(key);
                        return removedSegments.contains(keySegment);
                    }
                };
                this.persistenceManager.processOnAllStores(filter, new AdvancedCacheLoader.CacheLoaderTask(){

                    public void processEntry(MarshalledEntry marshalledEntry, AdvancedCacheLoader.TaskContext taskContext) throws InterruptedException {
                        keysToRemove.add(marshalledEntry.getKey());
                    }
                }, false, false, PersistenceManager.AccessMode.PRIVATE);
            }
            catch (CacheException e) {
                log.failedLoadingKeysFromCacheStore(e);
            }
        }
        if (!keysToRemove.isEmpty()) {
            try {
                InvalidateCommand invalidateCmd = this.commandsFactory.buildInvalidateCommand(EnumUtil.bitSetOf(Flag.CACHE_MODE_LOCAL, Flag.SKIP_LOCKING), keysToRemove.toArray());
                NonTxInvocationContext ctx = this.icf.createNonTxInvocationContext();
                ctx.setLockOwner(invalidateCmd.getKeyLockOwner());
                this.interceptorChain.invoke(ctx, invalidateCmd);
                if (trace) {
                    log.tracef("Removed %d keys, data container now has %d keys", keysToRemove.size(), this.dataContainer.size());
                }
            }
            catch (CacheException e) {
                log.failedToInvalidateKeys(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restartBrokenTransfers(CacheTopology cacheTopology, Set<Integer> addedSegments) {
        HashSet<Address> members = new HashSet<Address>(cacheTopology.getReadConsistentHash().getMembers());
        Object object = this.transferMapsLock;
        synchronized (object) {
            Iterator<Map.Entry<Address, List<InboundTransferTask>>> it = this.transfersBySource.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Address, List<InboundTransferTask>> entry = it.next();
                Address source = entry.getKey();
                if (members.contains(source)) continue;
                if (trace) {
                    log.tracef("Removing inbound transfers from source %s for cache %s", (Object)source, (Object)this.cacheName);
                }
                List<InboundTransferTask> inboundTransfers = entry.getValue();
                it.remove();
                for (InboundTransferTask inboundTransfer : inboundTransfers) {
                    if (trace) {
                        log.tracef("Removing inbound transfers for segments %s from source %s for cache %s", (Object)inboundTransfer.getSegments(), (Object)source, (Object)this.cacheName);
                    }
                    inboundTransfer.cancel();
                    this.transfersBySegment.keySet().removeAll(inboundTransfer.getSegments());
                    addedSegments.addAll(inboundTransfer.getUnfinishedSegments());
                }
            }
            addedSegments.removeAll(this.transfersBySegment.keySet());
        }
    }

    private int getSegment(Object key) {
        return this.cacheTopology.getReadConsistentHash().getSegment(key);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private InboundTransferTask addTransfer(Address source, Set<Integer> segmentsFromSource) {
        InboundTransferTask inboundTransfer;
        Object object = this.transferMapsLock;
        synchronized (object) {
            if (trace) {
                log.tracef("Adding transfer from %s for segments %s", (Object)source, (Object)segmentsFromSource);
            }
            segmentsFromSource.removeAll(this.transfersBySegment.keySet());
            if (segmentsFromSource.isEmpty()) {
                if (trace) {
                    log.tracef("All segments are already in progress, skipping", new Object[0]);
                }
                return null;
            }
            inboundTransfer = new InboundTransferTask(segmentsFromSource, source, this.cacheTopology.getTopologyId(), this, this.rpcManager, this.commandsFactory, this.timeout, this.cacheName);
            for (int segmentId : segmentsFromSource) {
                this.transfersBySegment.put(segmentId, inboundTransfer);
            }
            List<InboundTransferTask> inboundTransfers = this.transfersBySource.get(inboundTransfer.getSource());
            if (inboundTransfers == null) {
                inboundTransfers = new ArrayList<InboundTransferTask>();
                this.transfersBySource.put(inboundTransfer.getSource(), inboundTransfers);
            }
            inboundTransfers.add(inboundTransfer);
        }
        this.stateRequestCompletionService.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                boolean transferStarted = inboundTransfer.requestSegments();
                if (!transferStarted) {
                    return null;
                }
                if (trace) {
                    log.tracef("Waiting for inbound transfer to finish: %s", (Object)inboundTransfer);
                }
                StateConsumerImpl.this.stateRequestCompletionService.continueTaskInBackground();
                return null;
            }
        });
        return inboundTransfer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean removeTransfer(InboundTransferTask inboundTransfer) {
        Object object = this.transferMapsLock;
        synchronized (object) {
            List<InboundTransferTask> transfers;
            if (trace) {
                log.tracef("Removing inbound transfers for segments %s from source %s for cache %s", (Object)inboundTransfer.getSegments(), (Object)inboundTransfer.getSource(), (Object)this.cacheName);
            }
            if ((transfers = this.transfersBySource.get(inboundTransfer.getSource())) != null && transfers.remove(inboundTransfer)) {
                if (transfers.isEmpty()) {
                    this.transfersBySource.remove(inboundTransfer.getSource());
                }
                this.transfersBySegment.keySet().removeAll(inboundTransfer.getSegments());
                return true;
            }
        }
        return false;
    }

    void onTaskCompletion(final InboundTransferTask inboundTransfer) {
        if (!inboundTransfer.isStartedSuccessfully()) {
            this.retryOrNotifyCompletion(inboundTransfer);
        }
        this.stateRequestCompletionService.backgroundTaskFinished(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                StateConsumerImpl.this.retryOrNotifyCompletion(inboundTransfer);
                return null;
            }
        });
    }

    private void retryOrNotifyCompletion(InboundTransferTask inboundTransfer) {
        this.removeTransfer(inboundTransfer);
        if (!inboundTransfer.isCompletedSuccessfully() && !inboundTransfer.isCancelled()) {
            this.retryTransferTask(inboundTransfer);
        } else {
            if (trace) {
                log.tracef("Inbound transfer finished: %s", (Object)inboundTransfer);
            }
            this.notifyEndOfRebalanceIfNeeded(this.cacheTopology.getTopologyId(), this.cacheTopology.getRebalanceId());
        }
    }

    public static interface KeyInvalidationListener {
        public void beforeInvalidation(Set<Integer> var1, Set<Integer> var2);
    }
}

