/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.InternalEntryFactory;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.cluster.ClusterCacheNotifier;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.OutboundTransferTask;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateProvider;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.TransactionInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.xa.CacheTransaction;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Listener
public class StateProviderImpl
implements StateProvider {
    private static final Log log = LogFactory.getLog(StateProviderImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private String cacheName;
    private Configuration configuration;
    private RpcManager rpcManager;
    private CommandsFactory commandsFactory;
    private ClusterCacheNotifier clusterCacheNotifier;
    private TransactionTable transactionTable;
    private DataContainer dataContainer;
    private PersistenceManager persistenceManager;
    private ExecutorService executorService;
    private StateTransferLock stateTransferLock;
    private InternalEntryFactory entryFactory;
    private long timeout;
    private int chunkSize;
    private StateConsumer stateConsumer;
    private final Map<Address, List<OutboundTransferTask>> transfersByDestination = new HashMap<Address, List<OutboundTransferTask>>();

    @Inject
    public void init(Cache cache, @ComponentName(value="org.infinispan.executors.transport") ExecutorService executorService, Configuration configuration, RpcManager rpcManager, CommandsFactory commandsFactory, ClusterCacheNotifier clusterCacheNotifier, PersistenceManager persistenceManager, DataContainer dataContainer, TransactionTable transactionTable, StateTransferLock stateTransferLock, StateConsumer stateConsumer, InternalEntryFactory entryFactory) {
        this.cacheName = cache.getName();
        this.executorService = executorService;
        this.configuration = configuration;
        this.rpcManager = rpcManager;
        this.commandsFactory = commandsFactory;
        this.clusterCacheNotifier = clusterCacheNotifier;
        this.persistenceManager = persistenceManager;
        this.dataContainer = dataContainer;
        this.transactionTable = transactionTable;
        this.stateTransferLock = stateTransferLock;
        this.stateConsumer = stateConsumer;
        this.entryFactory = entryFactory;
        this.timeout = configuration.clustering().stateTransfer().timeout();
        this.chunkSize = configuration.clustering().stateTransfer().chunkSize();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isStateTransferInProgress() {
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            return !this.transfersByDestination.isEmpty();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onTopologyUpdate(CacheTopology cacheTopology, boolean isRebalance) {
        boolean stateTransferInProgress = cacheTopology.getPendingCH() != null;
        HashSet<Address> members = new HashSet<Address>(cacheTopology.getWriteConsistentHash().getMembers());
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            Iterator<Address> it = this.transfersByDestination.keySet().iterator();
            while (it.hasNext()) {
                Address destination = it.next();
                if (members.contains(destination)) continue;
                List<OutboundTransferTask> transfers = this.transfersByDestination.get(destination);
                it.remove();
                for (OutboundTransferTask outboundTransfer : transfers) {
                    outboundTransfer.cancel();
                }
            }
        }
    }

    @Override
    @Start(priority=60)
    public void start() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Stop(priority=20)
    public void stop() {
        if (trace) {
            log.tracef("Shutting down StateProvider of cache %s on node %s", (Object)this.cacheName, (Object)this.rpcManager.getAddress());
        }
        try {
            Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
            synchronized (map) {
                Iterator<List<OutboundTransferTask>> it = this.transfersByDestination.values().iterator();
                while (it.hasNext()) {
                    List<OutboundTransferTask> transfers = it.next();
                    it.remove();
                    for (OutboundTransferTask outboundTransfer : transfers) {
                        outboundTransfer.cancel();
                    }
                }
            }
        }
        catch (Throwable t) {
            log.errorf(t, "Failed to stop StateProvider of cache %s on node %s", (Object)this.cacheName, (Object)this.rpcManager.getAddress());
        }
    }

    @Override
    public List<TransactionInfo> getTransactionsForSegments(Address destination, int requestTopologyId, Set<Integer> segments) throws InterruptedException {
        CacheTopology cacheTopology;
        ConsistentHash readCh;
        Set<Integer> ownedSegments;
        if (trace) {
            log.tracef("Received request for transactions from node %s for segments %s of cache %s with topology id %d", destination, segments, this.cacheName, requestTopologyId);
        }
        if (!(ownedSegments = (readCh = (cacheTopology = this.getCacheTopology(requestTopologyId, destination, true)).getReadConsistentHash()).getSegmentsForOwner(this.rpcManager.getAddress())).containsAll(segments)) {
            segments.removeAll(ownedSegments);
            throw new IllegalArgumentException("Segments " + segments + " are not owned by " + this.rpcManager.getAddress());
        }
        ArrayList<TransactionInfo> transactions = new ArrayList<TransactionInfo>();
        if (this.configuration.transaction().transactionMode().isTransactional()) {
            this.collectTransactionsToTransfer(destination, transactions, this.transactionTable.getRemoteTransactions(), segments, cacheTopology);
            this.collectTransactionsToTransfer(destination, transactions, this.transactionTable.getLocalTransactions(), segments, cacheTopology);
            if (trace) {
                log.tracef("Found %d transaction(s) to transfer", (Object)transactions.size());
            }
        }
        return transactions;
    }

    @Override
    public Collection<DistributedCallable> getClusterListenersToInstall() {
        return this.clusterCacheNotifier.retrieveClusterListenerCallablesToInstall();
    }

    private CacheTopology getCacheTopology(int requestTopologyId, Address destination, boolean isReqForTransactions) throws InterruptedException {
        int currentTopologyId;
        CacheTopology cacheTopology = this.stateConsumer.getCacheTopology();
        int n = currentTopologyId = cacheTopology != null ? cacheTopology.getTopologyId() : -1;
        if (requestTopologyId < currentTopologyId) {
            if (isReqForTransactions) {
                log.debugf("Transactions were requested by node %s with topology %d, older than the local topology (%d)", (Object)destination, (Object)requestTopologyId, (Object)currentTopologyId);
            } else {
                log.debugf("Segments were requested by node %s with topology %d, older than the local topology (%d)", (Object)destination, (Object)requestTopologyId, (Object)currentTopologyId);
            }
        } else if (requestTopologyId > currentTopologyId) {
            if (trace) {
                log.tracef("%s were requested by node %s with topology %d, greater than the local topology (%d). Waiting for topology %d to be installed locally.", isReqForTransactions ? "Transactions" : "Segments", destination, requestTopologyId, currentTopologyId, requestTopologyId);
            }
            this.stateTransferLock.waitForTopology(requestTopologyId, this.timeout, TimeUnit.MILLISECONDS);
            cacheTopology = this.stateConsumer.getCacheTopology();
        }
        return cacheTopology;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void collectTransactionsToTransfer(Address destination, List<TransactionInfo> transactionsToTransfer, Collection<? extends CacheTransaction> transactions, Set<Integer> segments, CacheTopology cacheTopology) {
        int topologyId = cacheTopology.getTopologyId();
        List<Address> members = cacheTopology.getMembers();
        ConsistentHash readCh = cacheTopology.getReadConsistentHash();
        for (CacheTransaction cacheTransaction : transactions) {
            Set<Object> lockedKeys;
            if (cacheTransaction.getTopologyId() == topologyId || !members.contains(cacheTransaction.getGlobalTransaction().getAddress())) {
                if (!trace) continue;
                log.tracef("Skipping transaction %s as it was started in the current topology or by a leaver", (Object)cacheTransaction);
                continue;
            }
            HashSet<Object> filteredLockedKeys = new HashSet<Object>();
            Set<Object> set = lockedKeys = cacheTransaction.getLockedKeys();
            synchronized (set) {
                for (Object key : lockedKeys) {
                    if (!segments.contains(readCh.getSegment(key))) continue;
                    filteredLockedKeys.add(key);
                }
            }
            Set<Object> backupLockedKeys = cacheTransaction.getBackupLockedKeys();
            Set<Object> i$ = backupLockedKeys;
            synchronized (i$) {
                for (Object key : backupLockedKeys) {
                    if (!segments.contains(readCh.getSegment(key))) continue;
                    filteredLockedKeys.add(key);
                }
            }
            if (filteredLockedKeys.isEmpty()) {
                if (!trace) continue;
                log.tracef("Skipping transaction %s because the state requestor %s doesn't own any key", (Object)cacheTransaction, (Object)destination);
                continue;
            }
            if (trace) {
                log.tracef("Sending transaction %s to new owner %s", (Object)cacheTransaction, (Object)destination);
            }
            List<WriteCommand> txModifications = cacheTransaction.getModifications();
            WriteCommand[] modifications = null;
            if (!txModifications.isEmpty()) {
                modifications = txModifications.toArray(new WriteCommand[txModifications.size()]);
            }
            if (cacheTransaction instanceof LocalTransaction) {
                LocalTransaction localTx = (LocalTransaction)cacheTransaction;
                localTx.locksAcquired(Collections.singleton(destination));
                if (trace) {
                    log.tracef("Adding affected node %s to transferred transaction %s (keys %s)", (Object)destination, (Object)cacheTransaction.getGlobalTransaction(), (Object)filteredLockedKeys);
                }
            }
            transactionsToTransfer.add(new TransactionInfo(cacheTransaction.getGlobalTransaction(), cacheTransaction.getTopologyId(), modifications, filteredLockedKeys));
        }
    }

    @Override
    public void startOutboundTransfer(Address destination, int requestTopologyId, Set<Integer> segments) throws InterruptedException {
        if (trace) {
            log.tracef("Starting outbound transfer of segments %s to node %s with topology id %d for cache %s", segments, destination, requestTopologyId, this.cacheName);
        }
        CacheTopology cacheTopology = this.getCacheTopology(requestTopologyId, destination, false);
        OutboundTransferTask outboundTransfer = new OutboundTransferTask(destination, segments, this.chunkSize, requestTopologyId, cacheTopology.getReadConsistentHash(), this, this.dataContainer, this.persistenceManager, this.rpcManager, this.commandsFactory, this.entryFactory, this.timeout, this.cacheName);
        this.addTransfer(outboundTransfer);
        outboundTransfer.execute(this.executorService);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addTransfer(OutboundTransferTask transferTask) {
        if (trace) {
            log.tracef("Adding outbound transfer of segments %s to %s", (Object)transferTask.getSegments(), (Object)transferTask.getDestination());
        }
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            List<OutboundTransferTask> transfers = this.transfersByDestination.get(transferTask.getDestination());
            if (transfers == null) {
                transfers = new ArrayList<OutboundTransferTask>();
                this.transfersByDestination.put(transferTask.getDestination(), transfers);
            }
            transfers.add(transferTask);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancelOutboundTransfer(Address destination, int topologyId, Set<Integer> segments) {
        if (trace) {
            log.tracef("Cancelling outbound transfer of segments %s to node %s with topology id %d for cache %s", segments, destination, topologyId, this.cacheName);
        }
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            List<OutboundTransferTask> transferTasks = this.transfersByDestination.get(destination);
            if (transferTasks != null) {
                OutboundTransferTask[] taskListCopy;
                for (OutboundTransferTask transferTask : taskListCopy = transferTasks.toArray(new OutboundTransferTask[transferTasks.size()])) {
                    if (transferTask.getTopologyId() != topologyId) continue;
                    transferTask.cancelSegments(segments);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeTransfer(OutboundTransferTask transferTask) {
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            List<OutboundTransferTask> transferTasks = this.transfersByDestination.get(transferTask.getDestination());
            if (transferTasks != null) {
                transferTasks.remove(transferTask);
                if (transferTasks.isEmpty()) {
                    this.transfersByDestination.remove(transferTask.getDestination());
                }
            }
        }
    }

    void onTaskCompletion(OutboundTransferTask transferTask) {
        if (trace) {
            log.tracef("Removing %s outbound transfer of segments %s to %s for cache %s", transferTask.isCancelled() ? "cancelled" : "completed", transferTask.getSegments(), transferTask.getDestination(), this.cacheName);
        }
        this.removeTransfer(transferTask);
    }
}

