/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.partition.impl;

import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.MemberInfo;
import com.hazelcast.cluster.impl.ClusterServiceImpl;
import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.Member;
import com.hazelcast.core.MigrationEvent;
import com.hazelcast.core.MigrationListener;
import com.hazelcast.instance.GroupProperty;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.instance.NodeState;
import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.partition.InternalPartition;
import com.hazelcast.partition.InternalPartitionLostEvent;
import com.hazelcast.partition.InternalPartitionService;
import com.hazelcast.partition.MigrationEndpoint;
import com.hazelcast.partition.MigrationInfo;
import com.hazelcast.partition.NoDataMemberInClusterException;
import com.hazelcast.partition.PartitionEvent;
import com.hazelcast.partition.PartitionEventListener;
import com.hazelcast.partition.PartitionInfo;
import com.hazelcast.partition.PartitionLostEvent;
import com.hazelcast.partition.PartitionLostListener;
import com.hazelcast.partition.PartitionRuntimeState;
import com.hazelcast.partition.PartitionServiceProxy;
import com.hazelcast.partition.impl.AssignPartitions;
import com.hazelcast.partition.impl.ClearReplicaOperation;
import com.hazelcast.partition.impl.FinalizeMigrationOperation;
import com.hazelcast.partition.impl.HasOngoingMigration;
import com.hazelcast.partition.impl.InternalPartitionImpl;
import com.hazelcast.partition.impl.InternalPartitionServiceState;
import com.hazelcast.partition.impl.IsReplicaVersionSync;
import com.hazelcast.partition.impl.MigrationListenerAdapter;
import com.hazelcast.partition.impl.MigrationQueue;
import com.hazelcast.partition.impl.MigrationRequestOperation;
import com.hazelcast.partition.impl.PartitionListener;
import com.hazelcast.partition.impl.PartitionLostListenerAdapter;
import com.hazelcast.partition.impl.PartitionReplicaChangeEvent;
import com.hazelcast.partition.impl.PartitionReplicaChangeReason;
import com.hazelcast.partition.impl.PartitionReplicaVersions;
import com.hazelcast.partition.impl.PartitionStateGenerator;
import com.hazelcast.partition.impl.PartitionStateGeneratorImpl;
import com.hazelcast.partition.impl.PartitionStateOperation;
import com.hazelcast.partition.impl.PromoteFromBackupOperation;
import com.hazelcast.partition.impl.ReplicaSyncInfo;
import com.hazelcast.partition.impl.ReplicaSyncRequest;
import com.hazelcast.partition.impl.ResetReplicaVersionOperation;
import com.hazelcast.partition.impl.SyncReplicaVersion;
import com.hazelcast.partition.membergroup.MemberGroup;
import com.hazelcast.partition.membergroup.MemberGroupFactory;
import com.hazelcast.partition.membergroup.MemberGroupFactoryFactory;
import com.hazelcast.spi.EventPublishingService;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationResponseHandler;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.PartitionAwareService;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.OperationResponseHandlerFactory;
import com.hazelcast.spi.impl.eventservice.InternalEventService;
import com.hazelcast.spi.impl.executionservice.InternalExecutionService;
import com.hazelcast.spi.impl.operationservice.InternalOperationService;
import com.hazelcast.util.Clock;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.FutureUtil;
import com.hazelcast.util.HashUtil;
import com.hazelcast.util.scheduler.CoalescingDelayedTrigger;
import com.hazelcast.util.scheduler.EntryTaskScheduler;
import com.hazelcast.util.scheduler.EntryTaskSchedulerFactory;
import com.hazelcast.util.scheduler.ScheduleType;
import com.hazelcast.util.scheduler.ScheduledEntry;
import com.hazelcast.util.scheduler.ScheduledEntryProcessor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;

public class InternalPartitionServiceImpl
implements InternalPartitionService,
ManagedService,
EventPublishingService<PartitionEvent, PartitionEventListener<PartitionEvent>>,
PartitionAwareService {
    private static final String EXCEPTION_MSG_PARTITION_STATE_SYNC_TIMEOUT = "Partition state sync invocation timed out";
    private static final int DEFAULT_PAUSE_MILLIS = 1000;
    private static final int PARTITION_OWNERSHIP_WAIT_MILLIS = 10;
    private static final int REPLICA_SYNC_CHECK_TIMEOUT_SECONDS = 10;
    private final Node node;
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final int partitionCount;
    private final InternalPartitionImpl[] partitions;
    private final PartitionReplicaVersions[] replicaVersions;
    private final AtomicReferenceArray<ReplicaSyncInfo> replicaSyncRequests;
    private final EntryTaskScheduler<Integer, ReplicaSyncInfo> replicaSyncScheduler;
    @Probe
    private final Semaphore replicaSyncProcessLock;
    private final MigrationThread migrationThread;
    private final long partitionMigrationInterval;
    private final long partitionMigrationTimeout;
    private final long backupSyncCheckInterval;
    private final int maxParallelReplications;
    private final PartitionStateGenerator partitionStateGenerator;
    private final MemberGroupFactory memberGroupFactory;
    private final PartitionServiceProxy proxy;
    private final Lock lock = new ReentrantLock();
    private final InternalPartitionListener partitionListener;
    @Probe
    private final AtomicInteger stateVersion = new AtomicInteger();
    private final MigrationQueue migrationQueue = new MigrationQueue();
    private final AtomicBoolean migrationAllowed = new AtomicBoolean(true);
    @Probe(name="lastRepartitionTime")
    private final AtomicLong lastRepartitionTime = new AtomicLong();
    private final CoalescingDelayedTrigger delayedResumeMigrationTrigger;
    private final FutureUtil.ExceptionHandler partitionStateSyncTimeoutHandler;
    @Probe
    private volatile int memberGroupsSize;
    private volatile boolean initialized;
    @Probe(name="activeMigrationCount")
    private final ConcurrentMap<Integer, MigrationInfo> activeMigrations = new ConcurrentHashMap<Integer, MigrationInfo>(3, 0.75f, 1);
    private final LinkedList<MigrationInfo> completedMigrations = new LinkedList();
    @Probe
    private final AtomicLong completedMigrationCounter = new AtomicLong();

    public InternalPartitionServiceImpl(Node node) {
        int i;
        this.partitionCount = node.groupProperties.getInteger(GroupProperty.PARTITION_COUNT);
        this.node = node;
        this.nodeEngine = node.nodeEngine;
        this.logger = node.getLogger(InternalPartitionService.class);
        this.partitionStateSyncTimeoutHandler = FutureUtil.logAllExceptions(this.logger, EXCEPTION_MSG_PARTITION_STATE_SYNC_TIMEOUT, Level.FINEST);
        this.partitions = new InternalPartitionImpl[this.partitionCount];
        this.partitionListener = new InternalPartitionListener(this, node.getThisAddress());
        for (i = 0; i < this.partitionCount; ++i) {
            this.partitions[i] = new InternalPartitionImpl(i, this.partitionListener, node.getThisAddress());
        }
        this.replicaVersions = new PartitionReplicaVersions[this.partitionCount];
        for (i = 0; i < this.replicaVersions.length; ++i) {
            this.replicaVersions[i] = new PartitionReplicaVersions(i);
        }
        this.memberGroupFactory = MemberGroupFactoryFactory.newMemberGroupFactory(node.getConfig().getPartitionGroupConfig());
        this.partitionStateGenerator = new PartitionStateGeneratorImpl();
        long intervalMillis = node.groupProperties.getMillis(GroupProperty.PARTITION_MIGRATION_INTERVAL);
        this.partitionMigrationInterval = intervalMillis > 0L ? intervalMillis : 0L;
        this.partitionMigrationTimeout = node.groupProperties.getMillis(GroupProperty.PARTITION_MIGRATION_TIMEOUT);
        this.migrationThread = new MigrationThread(node);
        this.proxy = new PartitionServiceProxy(this);
        InternalExecutionService executionService = this.nodeEngine.getExecutionService();
        ScheduledExecutorService scheduledExecutor = executionService.getDefaultScheduledExecutor();
        this.replicaSyncScheduler = EntryTaskSchedulerFactory.newScheduler(scheduledExecutor, new ReplicaSyncEntryProcessor(this), ScheduleType.POSTPONE);
        this.replicaSyncRequests = new AtomicReferenceArray(this.partitionCount);
        long maxMigrationDelayMs = this.calculateMaxMigrationDelayOnMemberRemoved();
        long minMigrationDelayMs = this.calculateMigrationDelayOnMemberRemoved(maxMigrationDelayMs);
        this.delayedResumeMigrationTrigger = new CoalescingDelayedTrigger(executionService, minMigrationDelayMs, maxMigrationDelayMs, new Runnable(){

            @Override
            public void run() {
                InternalPartitionServiceImpl.this.resumeMigration();
            }
        });
        long definedBackupSyncCheckInterval = node.groupProperties.getSeconds(GroupProperty.PARTITION_BACKUP_SYNC_INTERVAL);
        this.backupSyncCheckInterval = definedBackupSyncCheckInterval > 0L ? definedBackupSyncCheckInterval : 1L;
        this.maxParallelReplications = node.groupProperties.getInteger(GroupProperty.PARTITION_MAX_PARALLEL_REPLICATIONS);
        this.replicaSyncProcessLock = new Semaphore(this.maxParallelReplications);
        this.nodeEngine.getMetricsRegistry().scanAndRegister(this, "partitions");
    }

    private long calculateMaxMigrationDelayOnMemberRemoved() {
        return this.node.groupProperties.getMillis(GroupProperty.OPERATION_CALL_TIMEOUT_MILLIS) / 2L;
    }

    private long calculateMigrationDelayOnMemberRemoved(long maxDelayMs) {
        long migrationDelayMs = this.node.groupProperties.getMillis(GroupProperty.MIGRATION_MIN_DELAY_ON_MEMBER_REMOVED_SECONDS);
        long connectionErrorDetectionIntervalMs = this.node.groupProperties.getMillis(GroupProperty.CONNECTION_MONITOR_INTERVAL) * (long)this.node.groupProperties.getInteger(GroupProperty.CONNECTION_MONITOR_MAX_FAULTS) * 5L;
        migrationDelayMs = Math.max(migrationDelayMs, connectionErrorDetectionIntervalMs);
        long heartbeatIntervalMs = this.node.groupProperties.getMillis(GroupProperty.HEARTBEAT_INTERVAL_SECONDS);
        migrationDelayMs = Math.max(migrationDelayMs, heartbeatIntervalMs * 3L);
        migrationDelayMs = Math.min(migrationDelayMs, maxDelayMs);
        return migrationDelayMs;
    }

    @Probe(name="migrationActive")
    private int migrationActiveProbe() {
        return this.migrationAllowed.get() ? 1 : 0;
    }

    @Probe
    private int localPartitionCount() {
        int count = 0;
        for (InternalPartitionImpl partition : this.partitions) {
            if (!partition.isLocal()) continue;
            ++count;
        }
        return count;
    }

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        this.migrationThread.start();
        int partitionTableSendInterval = this.node.groupProperties.getSeconds(GroupProperty.PARTITION_TABLE_SEND_INTERVAL);
        if (partitionTableSendInterval <= 0) {
            partitionTableSendInterval = 1;
        }
        ExecutionService executionService = nodeEngine.getExecutionService();
        executionService.scheduleAtFixedRate(new SendPartitionRuntimeStateTask(), partitionTableSendInterval, partitionTableSendInterval, TimeUnit.SECONDS);
        executionService.scheduleWithFixedDelay(new SyncReplicaVersionTask(), this.backupSyncCheckInterval, this.backupSyncCheckInterval, TimeUnit.SECONDS);
    }

    @Override
    public Address getPartitionOwner(int partitionId) {
        if (!this.initialized) {
            this.firstArrangement();
        }
        if (this.partitions[partitionId].getOwnerOrNull() == null && !this.node.isMaster() && this.node.joined() && !this.isClusterFormedByOnlyLiteMembers()) {
            this.notifyMasterToAssignPartitions();
        }
        return this.partitions[partitionId].getOwnerOrNull();
    }

    @Override
    public Address getPartitionOwnerOrWait(int partitionId) {
        Address owner;
        while ((owner = this.getPartitionOwner(partitionId)) == null) {
            if (!this.nodeEngine.isRunning()) {
                throw new HazelcastInstanceNotActiveException();
            }
            ClusterState clusterState = this.node.getClusterService().getClusterState();
            if (clusterState != ClusterState.ACTIVE) {
                throw new IllegalStateException("Partitions can't be assigned since cluster-state: " + (Object)((Object)clusterState));
            }
            if (this.isClusterFormedByOnlyLiteMembers()) {
                throw new NoDataMemberInClusterException("Partitions can't be assigned since all nodes in the cluster are lite members");
            }
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                throw ExceptionUtil.rethrow(e);
            }
        }
        return owner;
    }

    private boolean isClusterFormedByOnlyLiteMembers() {
        ClusterServiceImpl clusterService = this.node.getClusterService();
        return clusterService.getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyMasterToAssignPartitions() {
        if (this.initialized) {
            return;
        }
        ClusterState clusterState = this.node.getClusterService().getClusterState();
        if (clusterState != ClusterState.ACTIVE) {
            this.logger.warning("Partitions can't be assigned since cluster-state= " + (Object)((Object)clusterState));
            return;
        }
        if (this.lock.tryLock()) {
            try {
                if (!this.initialized && !this.node.isMaster() && this.node.getMasterAddress() != null && this.node.joined()) {
                    InternalCompletableFuture f = this.nodeEngine.getOperationService().createInvocationBuilder("hz:core:partitionService", (Operation)new AssignPartitions(), this.node.getMasterAddress()).setTryCount(1).invoke();
                    f.get(1L, TimeUnit.SECONDS);
                }
            }
            catch (Exception e) {
                this.logger.finest(e);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void firstArrangement() {
        if (this.initialized) {
            return;
        }
        if (!this.node.isMaster()) {
            this.notifyMasterToAssignPartitions();
            return;
        }
        this.lock.lock();
        try {
            if (this.initialized) {
                return;
            }
            if (!this.initializePartitionAssignments()) {
                return;
            }
            this.publishPartitionRuntimeState();
        }
        finally {
            this.lock.unlock();
        }
    }

    private Collection<MemberGroup> createMemberGroups() {
        Collection<Member> members = this.node.getClusterService().getMembers(MemberSelectors.DATA_MEMBER_SELECTOR);
        return this.memberGroupFactory.createMemberGroups(members);
    }

    private boolean initializePartitionAssignments() {
        ClusterState clusterState = this.node.getClusterService().getClusterState();
        if (clusterState != ClusterState.ACTIVE) {
            this.logger.warning("Partitions can't be assigned since cluster-state= " + (Object)((Object)clusterState));
            return false;
        }
        PartitionStateGenerator psg = this.partitionStateGenerator;
        Collection<MemberGroup> memberGroups = this.createMemberGroups();
        if (memberGroups.isEmpty()) {
            this.logger.warning("No member group is available to assign partition ownership...");
            return false;
        }
        this.logger.info("Initializing cluster partition table arrangement...");
        Address[][] newState = psg.initialize(memberGroups, this.partitionCount);
        if (newState.length != this.partitionCount) {
            throw new HazelcastException("Invalid partition count! Expected: " + this.partitionCount + ", Actual: " + newState.length);
        }
        this.stateVersion.incrementAndGet();
        clusterState = this.node.getClusterService().getClusterState();
        if (clusterState != ClusterState.ACTIVE) {
            this.stateVersion.decrementAndGet();
            this.logger.warning("Partitions can't be assigned since cluster-state= " + (Object)((Object)clusterState));
            return false;
        }
        for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
            InternalPartitionImpl partition = this.partitions[partitionId];
            Address[] replicas = newState[partitionId];
            partition.setReplicaAddresses(replicas);
        }
        this.initialized = true;
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setInitialState(Address[][] newState, int partitionStateVersion) {
        this.lock.lock();
        try {
            if (this.initialized) {
                throw new IllegalStateException("Partition table is already initialized!");
            }
            this.logger.info("Setting cluster partition table ...");
            boolean foundReplica = false;
            for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
                InternalPartitionImpl partition = this.partitions[partitionId];
                Address[] replicas = newState[partitionId];
                if (!foundReplica && replicas != null) {
                    for (int i = 0; i < 7; ++i) {
                        foundReplica |= replicas[i] != null;
                    }
                }
                partition.setInitialReplicaAddresses(replicas);
            }
            this.stateVersion.set(partitionStateVersion);
            this.initialized = foundReplica;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void updateMemberGroupsSize() {
        Collection<MemberGroup> groups = this.createMemberGroups();
        int size = 0;
        for (MemberGroup group : groups) {
            if (group.size() <= 0) continue;
            ++size;
        }
        this.memberGroupsSize = size;
    }

    @Override
    public int getMemberGroupsSize() {
        int size = this.memberGroupsSize;
        if (size > 0) {
            return size;
        }
        return this.node.isLiteMember() ? 0 : 1;
    }

    @Override
    @Probe(name="maxBackupCount")
    public int getMaxBackupCount() {
        return Math.max(Math.min(this.getMemberGroupsSize() - 1, 6), 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void memberAdded(MemberImpl member) {
        if (!member.localMember()) {
            this.updateMemberGroupsSize();
        }
        if (this.node.isMaster()) {
            this.lock.lock();
            try {
                this.migrationQueue.clear();
                if (this.initialized) {
                    ClusterState clusterState = this.nodeEngine.getClusterService().getClusterState();
                    if (clusterState == ClusterState.ACTIVE) {
                        this.stateVersion.incrementAndGet();
                        this.migrationQueue.add(new RepartitioningTask());
                    }
                    PartitionStateOperation op = new PartitionStateOperation(this.createPartitionState());
                    this.nodeEngine.getOperationService().send(op, member.getAddress());
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void memberRemoved(MemberImpl member) {
        this.logger.info("Removing " + member);
        this.updateMemberGroupsSize();
        Address deadAddress = member.getAddress();
        Address thisAddress = this.node.getThisAddress();
        if (thisAddress.equals(deadAddress)) {
            return;
        }
        this.lock.lock();
        try {
            if (this.initialized && this.node.getClusterService().getClusterState() == ClusterState.ACTIVE) {
                this.stateVersion.incrementAndGet();
            }
            this.migrationQueue.clear();
            if (this.node.isMaster()) {
                this.rollbackActiveMigrationsFromPreviousMaster(this.node.getLocalMember().getUuid());
            }
            this.invalidateActiveMigrationsBelongingTo(deadAddress);
            this.pauseMigration();
            this.cancelReplicaSyncRequestsInternal(deadAddress);
            this.removeDeadAddress(deadAddress, thisAddress);
            if (this.node.isMaster() && this.initialized) {
                this.migrationQueue.add(new RepartitioningTask());
            }
            this.resumeMigrationEventually();
        }
        finally {
            this.lock.unlock();
        }
    }

    private void invalidateActiveMigrationsBelongingTo(Address deadAddress) {
        if (!this.activeMigrations.isEmpty()) {
            for (MigrationInfo migrationInfo : this.activeMigrations.values()) {
                if (!deadAddress.equals(migrationInfo.getSource()) && !deadAddress.equals(migrationInfo.getDestination())) continue;
                migrationInfo.invalidate();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelReplicaSyncRequestsTo(Address deadAddress) {
        this.lock.lock();
        try {
            this.cancelReplicaSyncRequestsInternal(deadAddress);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void cancelReplicaSyncRequestsInternal(Address deadAddress) {
        for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
            ReplicaSyncInfo syncInfo = this.replicaSyncRequests.get(partitionId);
            if (syncInfo == null || !deadAddress.equals(syncInfo.target)) continue;
            this.cancelReplicaSync(partitionId);
        }
    }

    void cancelReplicaSync(int partitionId) {
        ReplicaSyncInfo syncInfo = this.replicaSyncRequests.get(partitionId);
        if (syncInfo != null && this.replicaSyncRequests.compareAndSet(partitionId, syncInfo, null)) {
            this.replicaSyncScheduler.cancel(partitionId);
            this.releaseReplicaSyncPermit();
        }
    }

    private void resumeMigrationEventually() {
        this.delayedResumeMigrationTrigger.executeWithDelay();
    }

    private void removeDeadAddress(Address deadAddress, Address thisAddress) {
        for (InternalPartitionImpl partition : this.partitions) {
            if (deadAddress.equals(partition.getOwnerOrNull()) && thisAddress.equals(partition.getReplicaAddress(1))) {
                partition.setMigrating(true);
            }
            partition.onDeadAddress(deadAddress);
            if (!partition.onDeadAddress(deadAddress)) continue;
            throw new IllegalStateException("Duplicate address found in partition replicas!");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rollbackActiveMigrationsFromPreviousMaster(String currentMasterUuid) {
        this.lock.lock();
        try {
            if (!this.activeMigrations.isEmpty()) {
                for (MigrationInfo migrationInfo : this.activeMigrations.values()) {
                    if (currentMasterUuid.equals(migrationInfo.getMasterUuid())) continue;
                    this.logger.info("Rolling-back migration initiated by the old master -> " + migrationInfo);
                    this.finalizeActiveMigration(migrationInfo);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public PartitionRuntimeState createPartitionState() {
        return this.createPartitionState(this.getCurrentMembersAndMembersRemovedWhileNotClusterNotActive());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PartitionRuntimeState createPartitionState(Collection<MemberImpl> members) {
        if (!this.initialized) {
            return null;
        }
        this.lock.lock();
        try {
            ArrayList<MemberInfo> memberInfos = new ArrayList<MemberInfo>(members.size());
            for (MemberImpl member : members) {
                MemberInfo memberInfo = new MemberInfo(member.getAddress(), member.getUuid(), member.getAttributes());
                memberInfos.add(memberInfo);
            }
            ArrayList<MigrationInfo> migrationInfos = new ArrayList<MigrationInfo>(this.completedMigrations);
            ILogger logger = this.node.getLogger(PartitionRuntimeState.class);
            PartitionRuntimeState partitionRuntimeState = new PartitionRuntimeState(logger, memberInfos, this.partitions, migrationInfos, this.stateVersion.get());
            return partitionRuntimeState;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishPartitionRuntimeState() {
        if (!this.initialized) {
            return;
        }
        if (!this.node.isMaster()) {
            return;
        }
        if (!this.isReplicaSyncAllowed()) {
            return;
        }
        this.lock.lock();
        try {
            List<MemberImpl> members = this.getCurrentMembersAndMembersRemovedWhileNotClusterNotActive();
            PartitionRuntimeState partitionState = this.createPartitionState(members);
            PartitionStateOperation op = new PartitionStateOperation(partitionState);
            InternalOperationService operationService = this.nodeEngine.getOperationService();
            ClusterServiceImpl clusterService = this.node.clusterService;
            for (MemberImpl member : members) {
                if (member.localMember() || clusterService.isMemberRemovedWhileClusterIsNotActive(member.getAddress())) continue;
                try {
                    operationService.send(op, member.getAddress());
                }
                catch (Exception e) {
                    this.logger.finest(e);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void syncPartitionRuntimeState() {
        this.syncPartitionRuntimeState(this.node.clusterService.getMemberImpls());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncPartitionRuntimeState(Collection<MemberImpl> members) {
        if (!this.initialized) {
            return;
        }
        if (!this.node.isMaster()) {
            return;
        }
        this.lock.lock();
        try {
            PartitionRuntimeState partitionState = this.createPartitionState(members);
            InternalOperationService operationService = this.nodeEngine.getOperationService();
            List<Future> calls = this.firePartitionStateOperation(members, partitionState, operationService);
            FutureUtil.waitWithDeadline(calls, 3L, TimeUnit.SECONDS, this.partitionStateSyncTimeoutHandler);
        }
        finally {
            this.lock.unlock();
        }
    }

    private List<Future> firePartitionStateOperation(Collection<MemberImpl> members, PartitionRuntimeState partitionState, OperationService operationService) {
        ClusterServiceImpl clusterService = this.node.clusterService;
        ArrayList<Future> calls = new ArrayList<Future>(members.size());
        for (MemberImpl member : members) {
            if (member.localMember() || clusterService.isMemberRemovedWhileClusterIsNotActive(member.getAddress())) continue;
            try {
                Address address = member.getAddress();
                PartitionStateOperation operation = new PartitionStateOperation(partitionState, true);
                InternalCompletableFuture f = operationService.invokeOnTarget("hz:core:partitionService", operation, address);
                calls.add(f);
            }
            catch (Exception e) {
                this.logger.finest(e);
            }
        }
        return calls;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processPartitionRuntimeState(PartitionRuntimeState partitionState) {
        this.lock.lock();
        try {
            Address sender = partitionState.getEndpoint();
            if (!this.node.getNodeExtension().isStartCompleted()) {
                this.logger.warning("Ignoring received partition table, startup is not completed yet. Sender: " + sender);
                return;
            }
            Address master = this.node.getMasterAddress();
            if (this.node.isMaster()) {
                this.logger.warning("This is the master node and received a PartitionRuntimeState from " + sender + ". Ignoring incoming state! ");
                return;
            }
            if (sender == null || !sender.equals(master)) {
                if (this.node.clusterService.getMember(sender) == null) {
                    this.logger.severe("Received a ClusterRuntimeState from an unknown member! => Sender: " + sender + ", Master: " + master + "! ");
                    return;
                }
                this.logger.warning("Received a ClusterRuntimeState, but its sender doesn't seem to be master! => Sender: " + sender + ", Master: " + master + "! " + "(Ignore if master node has changed recently.)");
                return;
            }
            this.stateVersion.set(partitionState.getVersion());
            this.initialized = true;
            PartitionInfo[] state = partitionState.getPartitions();
            this.filterAndLogUnknownAddressesInPartitionTable(sender, state);
            this.finalizeOrRollbackMigration(partitionState, state);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void finalizeOrRollbackMigration(PartitionRuntimeState partitionState, PartitionInfo[] state) {
        Collection<MigrationInfo> completedMigrations = partitionState.getCompletedMigrations();
        for (MigrationInfo completedMigration : completedMigrations) {
            this.addCompletedMigration(completedMigration);
            int partitionId = completedMigration.getPartitionId();
            PartitionInfo partitionInfo = state[partitionId];
            this.updatePartition(partitionInfo);
            this.finalizeActiveMigration(completedMigration);
        }
        if (!this.activeMigrations.isEmpty()) {
            MemberImpl masterMember = this.getMasterMember();
            this.rollbackActiveMigrationsFromPreviousMaster(masterMember.getUuid());
        }
        this.updateAllPartitions(state);
    }

    private void updatePartition(PartitionInfo partitionInfo) {
        InternalPartitionImpl partition = this.partitions[partitionInfo.getPartitionId()];
        Address[] replicas = partitionInfo.getReplicaAddresses();
        partition.setReplicaAddresses(replicas);
    }

    private void updateAllPartitions(PartitionInfo[] state) {
        for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
            this.updatePartition(state[partitionId]);
        }
    }

    private void filterAndLogUnknownAddressesInPartitionTable(Address sender, PartitionInfo[] state) {
        HashSet<Address> unknownAddresses = new HashSet<Address>();
        for (int partitionId = 0; partitionId < state.length; ++partitionId) {
            PartitionInfo partitionInfo = state[partitionId];
            this.searchUnknownAddressesInPartitionTable(sender, unknownAddresses, partitionId, partitionInfo);
        }
        this.logUnknownAddressesInPartitionTable(sender, unknownAddresses);
    }

    private void logUnknownAddressesInPartitionTable(Address sender, Set<Address> unknownAddresses) {
        if (!unknownAddresses.isEmpty() && this.logger.isLoggable(Level.WARNING)) {
            StringBuilder s = new StringBuilder("Following unknown addresses are found in partition table").append(" sent from master[").append(sender).append("].").append(" (Probably they have recently joined or left the cluster.)").append(" {");
            for (Address address : unknownAddresses) {
                s.append("\n\t").append(address);
            }
            s.append("\n}");
            this.logger.warning(s.toString());
        }
    }

    private void searchUnknownAddressesInPartitionTable(Address sender, Set<Address> unknownAddresses, int partitionId, PartitionInfo partitionInfo) {
        ClusterServiceImpl clusterService = this.node.clusterService;
        ClusterState clusterState = clusterService.getClusterState();
        for (int index = 0; index < 7; ++index) {
            Address address = partitionInfo.getReplicaAddress(index);
            if (address == null || this.getMember(address) != null || clusterState != ClusterState.ACTIVE && clusterService.isMemberRemovedWhileClusterIsNotActive(address)) continue;
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Unknown " + address + " found in partition table sent from master " + sender + ". It has probably already left the cluster. partitionId=" + partitionId);
            }
            unknownAddresses.add(address);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void finalizeActiveMigration(final MigrationInfo migrationInfo) {
        if (this.activeMigrations.containsKey(migrationInfo.getPartitionId())) {
            this.lock.lock();
            try {
                if (this.activeMigrations.containsValue(migrationInfo)) {
                    if (migrationInfo.startProcessing()) {
                        this.processMigrationInfo(migrationInfo);
                    } else {
                        this.logger.info("Scheduling finalization of " + migrationInfo + ", because migration process is currently running.");
                        this.nodeEngine.getExecutionService().schedule(new Runnable(){

                            @Override
                            public void run() {
                                InternalPartitionServiceImpl.this.finalizeActiveMigration(migrationInfo);
                            }
                        }, 3L, TimeUnit.SECONDS);
                    }
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processMigrationInfo(MigrationInfo migrationInfo) {
        try {
            Address thisAddress = this.node.getThisAddress();
            boolean source = thisAddress.equals(migrationInfo.getSource());
            boolean destination = thisAddress.equals(migrationInfo.getDestination());
            if (source || destination) {
                int partitionId = migrationInfo.getPartitionId();
                InternalPartitionImpl migratingPartition = this.getPartitionImpl(partitionId);
                Address ownerAddress = migratingPartition.getOwnerOrNull();
                boolean success = migrationInfo.getDestination().equals(ownerAddress);
                MigrationEndpoint endpoint = source ? MigrationEndpoint.SOURCE : MigrationEndpoint.DESTINATION;
                FinalizeMigrationOperation op = new FinalizeMigrationOperation(endpoint, success);
                op.setPartitionId(partitionId).setNodeEngine(this.nodeEngine).setValidateTarget(false).setService(this);
                this.nodeEngine.getOperationService().executeOperation(op);
            }
        }
        catch (Exception e) {
            this.logger.warning(e);
        }
        finally {
            migrationInfo.doneProcessing();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addActiveMigration(MigrationInfo migrationInfo) {
        this.lock.lock();
        try {
            int partitionId = migrationInfo.getPartitionId();
            this.partitions[partitionId].setMigrating(true);
            MigrationInfo currentMigrationInfo = this.activeMigrations.putIfAbsent(partitionId, migrationInfo);
            if (currentMigrationInfo != null) {
                MigrationInfo newMigration;
                MigrationInfo oldMigration;
                boolean oldMaster = false;
                MemberImpl masterMember = this.getMasterMember();
                String master = masterMember.getUuid();
                if (!master.equals(currentMigrationInfo.getMasterUuid())) {
                    oldMigration = currentMigrationInfo;
                    newMigration = migrationInfo;
                    oldMaster = true;
                } else if (!master.equals(migrationInfo.getMasterUuid())) {
                    oldMigration = migrationInfo;
                    newMigration = currentMigrationInfo;
                    oldMaster = true;
                } else if (!currentMigrationInfo.isProcessing() && migrationInfo.isProcessing()) {
                    oldMigration = currentMigrationInfo;
                    newMigration = migrationInfo;
                } else {
                    String message = "Something is seriously wrong! There are two migration requests for the same partition! First -> " + currentMigrationInfo + ", Second -> " + migrationInfo;
                    IllegalStateException error = new IllegalStateException(message);
                    this.logger.severe(message, error);
                    throw error;
                }
                if (oldMaster) {
                    this.logger.info("Finalizing migration instantiated by the old master -> " + oldMigration);
                } else if (this.logger.isFinestEnabled()) {
                    this.logger.finest("Finalizing previous migration -> " + oldMigration);
                }
                this.finalizeActiveMigration(oldMigration);
                this.activeMigrations.put(partitionId, newMigration);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private MemberImpl getMasterMember() {
        return this.node.clusterService.getMember(this.node.getMasterAddress());
    }

    MigrationInfo getActiveMigration(int partitionId) {
        return (MigrationInfo)this.activeMigrations.get(partitionId);
    }

    MigrationInfo removeActiveMigration(int partitionId) {
        this.partitions[partitionId].setMigrating(false);
        return (MigrationInfo)this.activeMigrations.remove(partitionId);
    }

    @Override
    public Collection<MigrationInfo> getActiveMigrations() {
        return Collections.unmodifiableCollection(this.activeMigrations.values());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addCompletedMigration(MigrationInfo migrationInfo) {
        this.completedMigrationCounter.incrementAndGet();
        this.lock.lock();
        try {
            if (this.completedMigrations.size() > 25) {
                this.completedMigrations.removeFirst();
            }
            this.completedMigrations.add(migrationInfo);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void evictCompletedMigrations() {
        this.lock.lock();
        try {
            if (!this.completedMigrations.isEmpty()) {
                this.completedMigrations.removeFirst();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    void triggerPartitionReplicaSync(int partitionId, int replicaIndex, long delayMillis) {
        if (replicaIndex < 0 || replicaIndex > 7) {
            throw new IllegalArgumentException("Invalid replica index! replicaIndex=" + replicaIndex + " for partitionId=" + partitionId);
        }
        if (!this.checkSyncPartitionTarget(partitionId, replicaIndex)) {
            return;
        }
        InternalPartitionImpl partition = this.getPartitionImpl(partitionId);
        Address target = partition.getOwnerOrNull();
        ReplicaSyncInfo syncInfo = new ReplicaSyncInfo(partitionId, replicaIndex, target);
        if (delayMillis > 0L) {
            this.schedulePartitionReplicaSync(syncInfo, target, delayMillis, "EXPLICIT DELAY");
            return;
        }
        if (!this.isReplicaSyncAllowed() || partition.isMigrating()) {
            this.schedulePartitionReplicaSync(syncInfo, target, 500L, "MIGRATION IS DISABLED OR PARTITION IS MIGRATING");
            return;
        }
        if (this.replicaSyncRequests.compareAndSet(partitionId, null, syncInfo)) {
            if (this.fireSyncReplicaRequest(syncInfo, target)) {
                return;
            }
            this.replicaSyncRequests.compareAndSet(partitionId, syncInfo, null);
            this.schedulePartitionReplicaSync(syncInfo, target, 500L, "NO PERMIT AVAILABLE");
            return;
        }
        long scheduleDelay = this.getReplicaSyncScheduleDelay(partitionId);
        this.schedulePartitionReplicaSync(syncInfo, target, scheduleDelay, "ANOTHER SYNC IN PROGRESS");
    }

    private long getReplicaSyncScheduleDelay(int partitionId) {
        long scheduleDelay = 5000L;
        Address thisAddress = this.node.getThisAddress();
        InternalPartitionImpl partition = this.getPartitionImpl(partitionId);
        ReplicaSyncInfo currentSyncInfo = this.replicaSyncRequests.get(partitionId);
        if (currentSyncInfo != null && !thisAddress.equals(partition.getReplicaAddress(currentSyncInfo.replicaIndex))) {
            this.clearReplicaSyncRequest(partitionId, currentSyncInfo.replicaIndex);
            scheduleDelay = 500L;
        }
        return scheduleDelay;
    }

    private boolean fireSyncReplicaRequest(ReplicaSyncInfo syncInfo, Address target) {
        if (this.node.clusterService.isMemberRemovedWhileClusterIsNotActive(target)) {
            return false;
        }
        if (this.tryToAcquireReplicaSyncPermit()) {
            int partitionId = syncInfo.partitionId;
            int replicaIndex = syncInfo.replicaIndex;
            this.replicaSyncScheduler.cancel(partitionId);
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Sending sync replica request to -> " + target + "; for partitionId=" + partitionId + ", replicaIndex=" + replicaIndex);
            }
            this.replicaSyncScheduler.schedule(this.partitionMigrationTimeout, partitionId, syncInfo);
            ReplicaSyncRequest syncRequest = new ReplicaSyncRequest(partitionId, replicaIndex);
            this.nodeEngine.getOperationService().send(syncRequest, target);
            return true;
        }
        return false;
    }

    private void schedulePartitionReplicaSync(ReplicaSyncInfo syncInfo, Address target, long delayMillis, String reason) {
        int partitionId = syncInfo.partitionId;
        int replicaIndex = syncInfo.replicaIndex;
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Scheduling [" + delayMillis + "ms] sync replica request to -> " + target + "; for partitionId=" + partitionId + ", replicaIndex=" + replicaIndex + ". Reason: [" + reason + "]");
        }
        this.replicaSyncScheduler.schedule(delayMillis, partitionId, syncInfo);
    }

    private boolean checkSyncPartitionTarget(int partitionId, int replicaIndex) {
        InternalPartitionImpl partition = this.getPartitionImpl(partitionId);
        Address target = partition.getOwnerOrNull();
        if (target == null) {
            this.logger.info("Sync replica target is null, no need to sync -> partitionId=" + partitionId + ", replicaIndex=" + replicaIndex);
            return false;
        }
        Address thisAddress = this.nodeEngine.getThisAddress();
        if (target.equals(thisAddress)) {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("This node is now owner of partition, cannot sync replica -> partitionId=" + partitionId + ", replicaIndex=" + replicaIndex + ", partition-info=" + this.getPartitionImpl(partitionId));
            }
            return false;
        }
        if (!partition.isOwnerOrBackup(thisAddress)) {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("This node is not backup replica of partitionId=" + partitionId + ", replicaIndex=" + replicaIndex + " anymore.");
            }
            return false;
        }
        return true;
    }

    @Override
    public InternalPartition[] getPartitions() {
        InternalPartition[] result = new InternalPartition[this.partitions.length];
        System.arraycopy(this.partitions, 0, result, 0, this.partitions.length);
        return result;
    }

    @Override
    public MemberImpl getMember(Address address) {
        return this.node.clusterService.getMember(address);
    }

    InternalPartitionImpl getPartitionImpl(int partitionId) {
        return this.partitions[partitionId];
    }

    @Override
    public InternalPartitionImpl getPartition(int partitionId) {
        return this.getPartition(partitionId, true);
    }

    @Override
    public InternalPartitionImpl getPartition(int partitionId, boolean triggerOwnerAssignment) {
        InternalPartitionImpl p = this.getPartitionImpl(partitionId);
        if (triggerOwnerAssignment && p.getOwnerOrNull() == null) {
            this.getPartitionOwner(partitionId);
        }
        return p;
    }

    @Override
    public boolean prepareToSafeShutdown(long timeout, TimeUnit unit) {
        long timeoutInMillis = unit.toMillis(timeout);
        long sleep = 1000L;
        while (timeoutInMillis > 0L) {
            while (timeoutInMillis > 0L && this.shouldWaitMigrationOrBackups(Level.INFO)) {
                timeoutInMillis = this.sleepWithBusyWait(timeoutInMillis, sleep);
            }
            if (timeoutInMillis <= 0L) break;
            if (this.node.isMaster()) {
                List<MemberImpl> members = this.getCurrentMembersAndMembersRemovedWhileNotClusterNotActive();
                this.syncPartitionRuntimeState(members);
            } else if ((timeoutInMillis = this.waitForOngoingMigrations(timeoutInMillis, sleep)) <= 0L) break;
            long start = Clock.currentTimeMillis();
            boolean ok = this.checkReplicaSyncState();
            timeoutInMillis -= Clock.currentTimeMillis() - start;
            if (ok) {
                this.logger.finest("Replica sync state before shutdown is OK");
                return true;
            }
            if (timeoutInMillis <= 0L) break;
            this.logger.info("Some backup replicas are inconsistent with primary, waiting for synchronization. Timeout: " + timeoutInMillis + "ms");
            timeoutInMillis = this.sleepWithBusyWait(timeoutInMillis, sleep);
        }
        return false;
    }

    private List<MemberImpl> getCurrentMembersAndMembersRemovedWhileNotClusterNotActive() {
        ArrayList<MemberImpl> members = new ArrayList<MemberImpl>();
        members.addAll(this.node.clusterService.getMemberImpls());
        members.addAll(this.node.clusterService.getMembersRemovedWhileClusterIsNotActive());
        return members;
    }

    private long waitForOngoingMigrations(long timeoutInMillis, long sleep) {
        long timeout = timeoutInMillis;
        while (timeout > 0L && this.hasOnGoingMigrationMaster(Level.WARNING)) {
            this.logger.info("Waiting for the master node to complete remaining migrations!");
            timeout = this.sleepWithBusyWait(timeout, sleep);
        }
        return timeout;
    }

    private long sleepWithBusyWait(long timeoutInMillis, long sleep) {
        try {
            Thread.sleep(sleep);
        }
        catch (InterruptedException ie) {
            this.logger.finest("Busy wait interrupted", ie);
        }
        return timeoutInMillis - sleep;
    }

    @Override
    public boolean isMemberStateSafe() {
        return this.getMemberState() == InternalPartitionServiceState.SAFE;
    }

    public InternalPartitionServiceState getMemberState() {
        if (this.hasOnGoingMigrationLocal()) {
            return InternalPartitionServiceState.MIGRATION_LOCAL;
        }
        if (!this.node.isMaster() && this.hasOnGoingMigrationMaster(Level.OFF)) {
            return InternalPartitionServiceState.MIGRATION_ON_MASTER;
        }
        return this.isReplicaInSyncState() ? InternalPartitionServiceState.SAFE : InternalPartitionServiceState.REPLICA_NOT_SYNC;
    }

    @Override
    public boolean hasOnGoingMigration() {
        return this.hasOnGoingMigrationLocal() || !this.node.isMaster() && this.hasOnGoingMigrationMaster(Level.FINEST);
    }

    private boolean hasOnGoingMigrationMaster(Level level) {
        Address masterAddress = this.node.getMasterAddress();
        if (masterAddress == null) {
            return this.node.joined();
        }
        HasOngoingMigration operation = new HasOngoingMigration();
        InternalOperationService operationService = this.nodeEngine.getOperationService();
        InvocationBuilder invocationBuilder = operationService.createInvocationBuilder("hz:core:partitionService", (Operation)operation, masterAddress);
        InternalCompletableFuture future = invocationBuilder.setTryCount(100).setTryPauseMillis(100L).invoke();
        try {
            return (Boolean)future.get(1L, TimeUnit.MINUTES);
        }
        catch (InterruptedException ie) {
            Logger.getLogger(InternalPartitionServiceImpl.class).finest("Future wait interrupted", ie);
        }
        catch (Exception e) {
            this.logger.log(level, "Could not get a response from master about migrations! -> " + e.toString());
        }
        return false;
    }

    @Override
    public boolean hasOnGoingMigrationLocal() {
        return !this.activeMigrations.isEmpty() || this.migrationQueue.isNonEmpty() || this.migrationQueue.hasMigrationTasks();
    }

    private boolean isReplicaInSyncState() {
        if (!this.initialized || !this.hasMultipleMemberGroups()) {
            return true;
        }
        boolean replicaIndex = true;
        ArrayList<Future> futures = new ArrayList<Future>();
        Address thisAddress = this.node.getThisAddress();
        for (InternalPartitionImpl partition : this.partitions) {
            Address owner = partition.getOwnerOrNull();
            if (!thisAddress.equals(owner) || partition.getReplicaAddress(1) == null) continue;
            int partitionId = partition.getPartitionId();
            long replicaVersion = this.getCurrentReplicaVersion(1, partitionId);
            Operation operation = this.createReplicaSyncStateOperation(replicaVersion, partitionId);
            Future future = this.invoke(operation, 1, partitionId);
            futures.add(future);
        }
        if (futures.isEmpty()) {
            return true;
        }
        for (Future future : futures) {
            boolean isSync = this.getFutureResult(future, 10L, TimeUnit.SECONDS);
            if (isSync) continue;
            return false;
        }
        return true;
    }

    private long getCurrentReplicaVersion(int replicaIndex, int partitionId) {
        long[] versions = this.getPartitionReplicaVersions(partitionId);
        return versions[replicaIndex - 1];
    }

    private boolean getFutureResult(Future future, long seconds, TimeUnit unit) {
        boolean sync;
        try {
            sync = (Boolean)future.get(seconds, unit);
        }
        catch (Throwable t) {
            sync = false;
            this.logger.warning("Exception while getting future", t);
        }
        return sync;
    }

    private Future invoke(Operation operation, int replicaIndex, int partitionId) {
        InternalOperationService operationService = this.nodeEngine.getOperationService();
        return operationService.createInvocationBuilder("hz:core:partitionService", operation, partitionId).setTryCount(3).setTryPauseMillis(250L).setReplicaIndex(replicaIndex).invoke();
    }

    private Operation createReplicaSyncStateOperation(long replicaVersion, int partitionId) {
        IsReplicaVersionSync op = new IsReplicaVersionSync(replicaVersion);
        op.setService(this);
        op.setNodeEngine(this.nodeEngine);
        op.setOperationResponseHandler(OperationResponseHandlerFactory.createErrorLoggingResponseHandler(this.node.getLogger(IsReplicaVersionSync.class)));
        op.setPartitionId(partitionId);
        return op;
    }

    private boolean checkReplicaSyncState() {
        if (!this.initialized) {
            return true;
        }
        if (!this.hasMultipleMemberGroups()) {
            return true;
        }
        Address thisAddress = this.node.getThisAddress();
        final Semaphore s = new Semaphore(0);
        final AtomicBoolean ok = new AtomicBoolean(true);
        ExecutionCallback<Object> callback = new ExecutionCallback<Object>(){

            @Override
            public void onResponse(Object response) {
                if (Boolean.FALSE.equals(response)) {
                    ok.compareAndSet(true, false);
                }
                s.release();
            }

            @Override
            public void onFailure(Throwable t) {
                ok.compareAndSet(true, false);
            }
        };
        int ownedCount = this.submitSyncReplicaOperations(thisAddress, s, ok, callback);
        try {
            if (ok.get()) {
                int permits = ownedCount * this.getMaxBackupCount();
                return s.tryAcquire(permits, 10L, TimeUnit.SECONDS) && ok.get();
            }
            return false;
        }
        catch (InterruptedException ignored) {
            return false;
        }
    }

    private int submitSyncReplicaOperations(Address thisAddress, Semaphore s, AtomicBoolean ok, ExecutionCallback callback) {
        int ownedCount = 0;
        ILogger responseLogger = this.node.getLogger(SyncReplicaVersion.class);
        OperationResponseHandler responseHandler = OperationResponseHandlerFactory.createErrorLoggingResponseHandler(responseLogger);
        int maxBackupCount = this.getMaxBackupCount();
        for (InternalPartitionImpl partition : this.partitions) {
            Address owner = partition.getOwnerOrNull();
            if (thisAddress.equals(owner)) {
                for (int i = 1; i <= maxBackupCount; ++i) {
                    Address replicaAddress = partition.getReplicaAddress(i);
                    if (replicaAddress != null) {
                        if (this.checkClusterStateForReplicaSync(replicaAddress)) {
                            SyncReplicaVersion op = new SyncReplicaVersion(i, callback);
                            op.setService(this);
                            op.setNodeEngine(this.nodeEngine);
                            op.setOperationResponseHandler(responseHandler);
                            op.setPartitionId(partition.getPartitionId());
                            this.nodeEngine.getOperationService().executeOperation(op);
                            continue;
                        }
                        s.release();
                        continue;
                    }
                    ok.set(false);
                    s.release();
                }
                ++ownedCount;
                continue;
            }
            if (owner != null) continue;
            ok.set(false);
        }
        return ownedCount;
    }

    private boolean checkClusterStateForReplicaSync(Address address) {
        ClusterServiceImpl clusterService = this.node.clusterService;
        ClusterState clusterState = clusterService.getClusterState();
        if (clusterState == ClusterState.ACTIVE || clusterState == ClusterState.IN_TRANSITION) {
            return true;
        }
        return !clusterService.isMemberRemovedWhileClusterIsNotActive(address);
    }

    private boolean shouldWaitMigrationOrBackups(Level level) {
        if (!this.preCheckShouldWaitMigrationOrBackups()) {
            return false;
        }
        if (this.checkForActiveMigrations(level)) {
            return true;
        }
        for (InternalPartitionImpl partition : this.partitions) {
            boolean canTakeBackup;
            if (partition.getReplicaAddress(1) != null) continue;
            boolean bl = canTakeBackup = !this.isClusterFormedByOnlyLiteMembers();
            if (canTakeBackup && this.logger.isLoggable(level)) {
                this.logger.log(level, "Should take backup of partitionId=" + partition.getPartitionId());
            }
            return canTakeBackup;
        }
        int replicaSyncProcesses = this.maxParallelReplications - this.replicaSyncProcessLock.availablePermits();
        if (replicaSyncProcesses > 0) {
            if (this.logger.isLoggable(level)) {
                this.logger.log(level, "Processing replica sync requests: " + replicaSyncProcesses);
            }
            return true;
        }
        return false;
    }

    private boolean preCheckShouldWaitMigrationOrBackups() {
        if (!this.initialized) {
            return false;
        }
        return this.hasMultipleMemberGroups();
    }

    private boolean hasMultipleMemberGroups() {
        return this.getMemberGroupsSize() >= 2;
    }

    private boolean checkForActiveMigrations(Level level) {
        int activeSize = this.activeMigrations.size();
        if (activeSize != 0) {
            if (this.logger.isLoggable(level)) {
                this.logger.log(level, "Waiting for active migration tasks: " + activeSize);
            }
            return true;
        }
        int queueSize = this.migrationQueue.size();
        if (queueSize != 0) {
            if (this.logger.isLoggable(level)) {
                this.logger.log(level, "Waiting for cluster migration tasks: " + queueSize);
            }
            return true;
        }
        return false;
    }

    @Override
    public final int getPartitionId(Data key) {
        return HashUtil.hashToIndex(key.getPartitionHash(), this.partitionCount);
    }

    @Override
    public final int getPartitionId(Object key) {
        return this.getPartitionId(this.nodeEngine.toData(key));
    }

    @Override
    public final int getPartitionCount() {
        return this.partitionCount;
    }

    public long getPartitionMigrationTimeout() {
        return this.partitionMigrationTimeout;
    }

    @Override
    public long[] incrementPartitionReplicaVersions(int partitionId, int backupCount) {
        PartitionReplicaVersions replicaVersion = this.replicaVersions[partitionId];
        return replicaVersion.incrementAndGet(backupCount);
    }

    @Override
    public void updatePartitionReplicaVersions(int partitionId, long[] versions, int replicaIndex) {
        PartitionReplicaVersions partitionVersion = this.replicaVersions[partitionId];
        if (!partitionVersion.update(versions, replicaIndex)) {
            this.triggerPartitionReplicaSync(partitionId, replicaIndex, 0L);
        }
    }

    @Override
    public long[] getPartitionReplicaVersions(int partitionId) {
        return this.replicaVersions[partitionId].get();
    }

    @Override
    public void setPartitionReplicaVersions(int partitionId, long[] versions, int replicaOffset) {
        this.replicaVersions[partitionId].set(versions, replicaOffset);
    }

    @Override
    public void clearPartitionReplicaVersions(int partitionId) {
        this.replicaVersions[partitionId].clear();
    }

    void finalizeReplicaSync(int partitionId, int replicaIndex, long[] versions) {
        PartitionReplicaVersions replicaVersion = this.replicaVersions[partitionId];
        replicaVersion.clear();
        replicaVersion.set(versions, replicaIndex);
        this.clearReplicaSyncRequest(partitionId, replicaIndex);
    }

    void clearReplicaSyncRequest(int partitionId, int replicaIndex) {
        ReplicaSyncInfo syncInfo = new ReplicaSyncInfo(partitionId, replicaIndex, null);
        ReplicaSyncInfo currentSyncInfo = this.replicaSyncRequests.get(partitionId);
        this.replicaSyncScheduler.cancelIfExists(partitionId, syncInfo);
        if (syncInfo.equals(currentSyncInfo) && this.replicaSyncRequests.compareAndSet(partitionId, currentSyncInfo, null)) {
            this.releaseReplicaSyncPermit();
        } else if (currentSyncInfo != null && this.logger.isFinestEnabled()) {
            this.logger.finest("Not able to cancel sync! " + syncInfo + " VS Current " + currentSyncInfo);
        }
    }

    boolean tryToAcquireReplicaSyncPermit() {
        return this.replicaSyncProcessLock.tryAcquire();
    }

    void releaseReplicaSyncPermit() {
        this.replicaSyncProcessLock.release();
    }

    @Override
    public Map<Address, List<Integer>> getMemberPartitionsMap() {
        Collection<Member> dataMembers = this.node.getClusterService().getMembers(MemberSelectors.DATA_MEMBER_SELECTOR);
        int dataMembersSize = dataMembers.size();
        HashMap<Address, List<Integer>> memberPartitions = new HashMap<Address, List<Integer>>(dataMembersSize);
        for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
            Address owner = this.getPartitionOwnerOrWait(partitionId);
            ArrayList<Integer> ownedPartitions = (ArrayList<Integer>)memberPartitions.get(owner);
            if (ownedPartitions == null) {
                ownedPartitions = new ArrayList<Integer>();
                memberPartitions.put(owner, ownedPartitions);
            }
            ownedPartitions.add(partitionId);
        }
        return memberPartitions;
    }

    @Override
    public List<Integer> getMemberPartitions(Address target) {
        LinkedList<Integer> ownedPartitions = new LinkedList<Integer>();
        for (int i = 0; i < this.partitionCount; ++i) {
            Address owner = this.getPartitionOwner(i);
            if (!target.equals(owner)) continue;
            ownedPartitions.add(i);
        }
        return ownedPartitions;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reset() {
        this.migrationQueue.clear();
        for (int k = 0; k < this.replicaSyncRequests.length(); ++k) {
            this.replicaSyncRequests.set(k, null);
        }
        this.replicaSyncScheduler.cancelAll();
        this.replicaSyncProcessLock.drainPermits();
        this.replicaSyncProcessLock.release(this.maxParallelReplications);
        this.lock.lock();
        try {
            this.initialized = false;
            for (InternalPartitionImpl partition : this.partitions) {
                partition.reset();
            }
            this.activeMigrations.clear();
            this.completedMigrations.clear();
            this.stateVersion.set(0);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void pauseMigration() {
        this.migrationAllowed.set(false);
    }

    @Override
    public void resumeMigration() {
        this.migrationAllowed.set(true);
    }

    public boolean isReplicaSyncAllowed() {
        return this.migrationAllowed.get();
    }

    public boolean isMigrationAllowed() {
        if (this.migrationAllowed.get()) {
            ClusterState clusterState = this.node.getClusterService().getClusterState();
            return clusterState == ClusterState.ACTIVE;
        }
        return false;
    }

    @Override
    public void shutdown(boolean terminate) {
        this.logger.finest("Shutting down the partition service");
        this.migrationThread.stopNow();
        this.reset();
    }

    @Override
    @Probe(name="migrationQueueSize")
    public long getMigrationQueueSize() {
        return this.migrationQueue.size();
    }

    @Override
    public PartitionServiceProxy getPartitionServiceProxy() {
        return this.proxy;
    }

    private void sendMigrationEvent(MigrationInfo migrationInfo, MigrationEvent.MigrationStatus status) {
        MemberImpl current = this.getMember(migrationInfo.getSource());
        MemberImpl newOwner = this.getMember(migrationInfo.getDestination());
        MigrationEvent event = new MigrationEvent(migrationInfo.getPartitionId(), current, newOwner, status);
        InternalEventService eventService = this.nodeEngine.getEventService();
        Collection<EventRegistration> registrations = eventService.getRegistrations("hz:core:partitionService", ".migration");
        eventService.publishEvent("hz:core:partitionService", registrations, (Object)event, event.getPartitionId());
    }

    @Override
    public String addMigrationListener(MigrationListener listener) {
        if (listener == null) {
            throw new NullPointerException("listener can't be null");
        }
        MigrationListenerAdapter adapter = new MigrationListenerAdapter(listener);
        InternalEventService eventService = this.nodeEngine.getEventService();
        EventRegistration registration = eventService.registerListener("hz:core:partitionService", ".migration", adapter);
        return registration.getId();
    }

    @Override
    public boolean removeMigrationListener(String registrationId) {
        if (registrationId == null) {
            throw new NullPointerException("registrationId can't be null");
        }
        InternalEventService eventService = this.nodeEngine.getEventService();
        return eventService.deregisterListener("hz:core:partitionService", ".migration", registrationId);
    }

    @Override
    public String addPartitionLostListener(PartitionLostListener listener) {
        if (listener == null) {
            throw new NullPointerException("listener can't be null");
        }
        PartitionLostListenerAdapter adapter = new PartitionLostListenerAdapter(listener);
        InternalEventService eventService = this.nodeEngine.getEventService();
        EventRegistration registration = eventService.registerListener("hz:core:partitionService", ".partitionLost", adapter);
        return registration.getId();
    }

    @Override
    public String addLocalPartitionLostListener(PartitionLostListener listener) {
        if (listener == null) {
            throw new NullPointerException("listener can't be null");
        }
        PartitionLostListenerAdapter adapter = new PartitionLostListenerAdapter(listener);
        InternalEventService eventService = this.nodeEngine.getEventService();
        EventRegistration registration = eventService.registerLocalListener("hz:core:partitionService", ".partitionLost", adapter);
        return registration.getId();
    }

    @Override
    public boolean removePartitionLostListener(String registrationId) {
        if (registrationId == null) {
            throw new NullPointerException("registrationId can't be null");
        }
        InternalEventService eventService = this.nodeEngine.getEventService();
        return eventService.deregisterListener("hz:core:partitionService", ".partitionLost", registrationId);
    }

    @Override
    public void dispatchEvent(PartitionEvent partitionEvent, PartitionEventListener partitionEventListener) {
        partitionEventListener.onEvent(partitionEvent);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addPartitionListener(PartitionListener listener) {
        this.lock.lock();
        try {
            PartitionListenerNode head = this.partitionListener.listenerHead;
            this.partitionListener.listenerHead = new PartitionListenerNode(listener, head);
        }
        finally {
            this.lock.unlock();
        }
    }

    public String toString() {
        return "PartitionManager[" + this.stateVersion + "] {\n\n" + "migrationQ: " + this.migrationQueue.size() + "\n}";
    }

    public Node getNode() {
        return this.node;
    }

    @Override
    public boolean isPartitionOwner(int partitionId) {
        InternalPartitionImpl partition = this.getPartition(partitionId);
        return this.node.getThisAddress().equals(partition.getOwnerOrNull());
    }

    @Override
    public int getPartitionStateVersion() {
        return this.stateVersion.get();
    }

    @Override
    public void onPartitionLost(InternalPartitionLostEvent event) {
        PartitionLostEvent partitionLostEvent = new PartitionLostEvent(event.getPartitionId(), event.getLostReplicaIndex(), event.getEventSource());
        InternalEventService eventService = this.nodeEngine.getEventService();
        Collection<EventRegistration> registrations = eventService.getRegistrations("hz:core:partitionService", ".partitionLost");
        eventService.publishEvent("hz:core:partitionService", registrations, (Object)partitionLostEvent, event.getPartitionId());
    }

    public List<ReplicaSyncInfo> getOngoingReplicaSyncRequests() {
        int length = this.replicaSyncRequests.length();
        ArrayList<ReplicaSyncInfo> replicaSyncRequestsList = new ArrayList<ReplicaSyncInfo>(length);
        for (int i = 0; i < length; ++i) {
            ReplicaSyncInfo replicaSyncInfo = this.replicaSyncRequests.get(i);
            if (replicaSyncInfo == null) continue;
            replicaSyncRequestsList.add(replicaSyncInfo);
        }
        return replicaSyncRequestsList;
    }

    public List<ScheduledEntry<Integer, ReplicaSyncInfo>> getScheduledReplicaSyncRequests() {
        ArrayList<ScheduledEntry<Integer, ReplicaSyncInfo>> entries = new ArrayList<ScheduledEntry<Integer, ReplicaSyncInfo>>();
        for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
            ScheduledEntry<Integer, ReplicaSyncInfo> entry = this.replicaSyncScheduler.get(partitionId);
            if (entry == null) continue;
            entries.add(entry);
        }
        return entries;
    }

    private static final class PartitionListenerNode {
        final PartitionListener listener;
        final PartitionListenerNode next;

        PartitionListenerNode(PartitionListener listener, PartitionListenerNode next) {
            this.listener = listener;
            this.next = next;
        }
    }

    private static class ReplicaSyncEntryProcessor
    implements ScheduledEntryProcessor<Integer, ReplicaSyncInfo> {
        final InternalPartitionServiceImpl partitionService;

        ReplicaSyncEntryProcessor(InternalPartitionServiceImpl partitionService) {
            this.partitionService = partitionService;
        }

        @Override
        public void process(EntryTaskScheduler<Integer, ReplicaSyncInfo> scheduler, Collection<ScheduledEntry<Integer, ReplicaSyncInfo>> entries) {
            for (ScheduledEntry<Integer, ReplicaSyncInfo> entry : entries) {
                InternalPartitionImpl partition;
                int currentReplicaIndex;
                ReplicaSyncInfo syncInfo = entry.getValue();
                int partitionId = syncInfo.partitionId;
                if (this.partitionService.replicaSyncRequests.compareAndSet(partitionId, syncInfo, null)) {
                    this.partitionService.releaseReplicaSyncPermit();
                }
                if ((currentReplicaIndex = (partition = this.partitionService.getPartitionImpl(partitionId)).getReplicaIndex(this.partitionService.node.getThisAddress())) <= 0) continue;
                this.partitionService.triggerPartitionReplicaSync(partitionId, currentReplicaIndex, 0L);
            }
        }
    }

    private static final class InternalPartitionListener
    implements PartitionListener {
        final Address thisAddress;
        final InternalPartitionServiceImpl partitionService;
        volatile PartitionListenerNode listenerHead;

        private InternalPartitionListener(InternalPartitionServiceImpl partitionService, Address thisAddress) {
            this.thisAddress = thisAddress;
            this.partitionService = partitionService;
        }

        @Override
        public void replicaChanged(PartitionReplicaChangeEvent event) {
            boolean initialAssignment;
            int partitionId = event.getPartitionId();
            int replicaIndex = event.getReplicaIndex();
            Address newAddress = event.getNewAddress();
            Address oldAddress = event.getOldAddress();
            PartitionReplicaChangeReason reason = event.getReason();
            boolean bl = initialAssignment = event.getOldAddress() == null;
            if (replicaIndex > 0) {
                if (this.thisAddress.equals(oldAddress)) {
                    this.clearPartition(partitionId, replicaIndex);
                } else if (this.thisAddress.equals(newAddress)) {
                    this.synchronizePartition(partitionId, replicaIndex, reason, initialAssignment);
                }
            } else {
                if (!initialAssignment && this.thisAddress.equals(newAddress)) {
                    this.promoteFromBackups(partitionId, reason, oldAddress);
                }
                this.partitionService.cancelReplicaSync(partitionId);
            }
            Node node = this.partitionService.node;
            if (replicaIndex == 0 && newAddress == null && node.isRunning() && node.joined()) {
                this.logOwnerOfPartitionIsRemoved(event);
            }
            if (node.isMaster()) {
                this.partitionService.stateVersion.incrementAndGet();
            }
            this.callListeners(event);
        }

        private void callListeners(PartitionReplicaChangeEvent event) {
            PartitionListenerNode listenerNode = this.listenerHead;
            while (listenerNode != null) {
                try {
                    listenerNode.listener.replicaChanged(event);
                }
                catch (Throwable e) {
                    this.partitionService.logger.warning("While calling PartitionListener: " + listenerNode.listener, e);
                }
                listenerNode = listenerNode.next;
            }
        }

        private void clearPartition(int partitionId, int oldReplicaIndex) {
            NodeEngineImpl nodeEngine = this.partitionService.nodeEngine;
            ClearReplicaOperation op = new ClearReplicaOperation(oldReplicaIndex);
            op.setPartitionId(partitionId).setNodeEngine(nodeEngine).setService(this.partitionService);
            nodeEngine.getOperationService().executeOperation(op);
        }

        private void synchronizePartition(int partitionId, int replicaIndex, PartitionReplicaChangeReason reason, boolean initialAssignment) {
            if (this.partitionService.initialized) {
                long delayMillis = 0L;
                if (replicaIndex > 1) {
                    delayMillis = (long)(500.0 + Math.random() * 5000.0);
                }
                this.resetReplicaVersion(partitionId, replicaIndex, reason, initialAssignment);
                this.partitionService.triggerPartitionReplicaSync(partitionId, replicaIndex, delayMillis);
            }
        }

        private void resetReplicaVersion(int partitionId, int replicaIndex, PartitionReplicaChangeReason reason, boolean initialAssignment) {
            NodeEngineImpl nodeEngine = this.partitionService.nodeEngine;
            ResetReplicaVersionOperation op = new ResetReplicaVersionOperation(reason, initialAssignment);
            op.setPartitionId(partitionId).setReplicaIndex(replicaIndex).setNodeEngine(nodeEngine).setService(this.partitionService);
            nodeEngine.getOperationService().executeOperation(op);
        }

        private void promoteFromBackups(int partitionId, PartitionReplicaChangeReason reason, Address oldAddress) {
            NodeEngineImpl nodeEngine = this.partitionService.nodeEngine;
            PromoteFromBackupOperation op = new PromoteFromBackupOperation(reason, oldAddress);
            op.setPartitionId(partitionId).setNodeEngine(nodeEngine).setService(this.partitionService);
            nodeEngine.getOperationService().executeOperation(op);
        }

        private void logOwnerOfPartitionIsRemoved(PartitionReplicaChangeEvent event) {
            String warning = "Owner of partition is being removed! Possible data loss for partitionId=" + event.getPartitionId() + " , " + event;
            this.partitionService.logger.warning(warning);
        }
    }

    private class MigrationThread
    extends Thread
    implements Runnable {
        private final long sleepTime;

        MigrationThread(Node node) {
            super(node.getHazelcastThreadGroup().getInternalThreadGroup(), node.getHazelcastThreadGroup().getThreadNamePrefix("migration"));
            this.sleepTime = Math.max(250L, InternalPartitionServiceImpl.this.partitionMigrationInterval);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (!this.isInterrupted()) {
                    this.doRun();
                }
            }
            catch (InterruptedException e) {
                if (InternalPartitionServiceImpl.this.logger.isFinestEnabled()) {
                    InternalPartitionServiceImpl.this.logger.finest("MigrationThread is interrupted: " + e.getMessage());
                }
            }
            catch (OutOfMemoryError e) {
                OutOfMemoryErrorDispatcher.onOutOfMemory(e);
            }
            finally {
                InternalPartitionServiceImpl.this.migrationQueue.clear();
            }
        }

        private void doRun() throws InterruptedException {
            boolean hasNoTasks;
            Runnable r;
            boolean migrating = false;
            while (InternalPartitionServiceImpl.this.isMigrationAllowed() && (r = InternalPartitionServiceImpl.this.migrationQueue.poll(1, TimeUnit.SECONDS)) != null) {
                migrating |= r instanceof MigrateTask;
                this.processTask(r);
                if (InternalPartitionServiceImpl.this.partitionMigrationInterval <= 0L) continue;
                Thread.sleep(InternalPartitionServiceImpl.this.partitionMigrationInterval);
            }
            boolean bl = hasNoTasks = !InternalPartitionServiceImpl.this.migrationQueue.hasMigrationTasks();
            if (hasNoTasks) {
                if (migrating) {
                    InternalPartitionServiceImpl.this.logger.info("All migration tasks have been completed, queues are empty.");
                }
                InternalPartitionServiceImpl.this.evictCompletedMigrations();
                Thread.sleep(this.sleepTime);
            } else if (!InternalPartitionServiceImpl.this.isMigrationAllowed()) {
                Thread.sleep(this.sleepTime);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean processTask(Runnable r) {
            try {
                if (r == null || this.isInterrupted()) {
                    boolean bl = false;
                    return bl;
                }
                r.run();
            }
            catch (Throwable t) {
                InternalPartitionServiceImpl.this.logger.warning(t);
            }
            finally {
                InternalPartitionServiceImpl.this.migrationQueue.afterTaskCompletion(r);
            }
            return true;
        }

        void stopNow() {
            InternalPartitionServiceImpl.this.migrationQueue.clear();
            this.interrupt();
        }
    }

    class MigrateTask
    implements Runnable {
        final MigrationInfo migrationInfo;
        final Address[] addresses;

        public MigrateTask(MigrationInfo migrationInfo, Address[] addresses) {
            this.migrationInfo = migrationInfo;
            this.addresses = addresses;
            MemberImpl masterMember = InternalPartitionServiceImpl.this.getMasterMember();
            if (masterMember != null) {
                migrationInfo.setMasterUuid(masterMember.getUuid());
                migrationInfo.setMaster(masterMember.getAddress());
            }
        }

        @Override
        public void run() {
            if (!InternalPartitionServiceImpl.this.node.isMaster()) {
                return;
            }
            MigrationRequestOperation migrationRequestOp = new MigrationRequestOperation(this.migrationInfo);
            try {
                Boolean result;
                MigrationInfo info = this.migrationInfo;
                InternalPartitionImpl partition = InternalPartitionServiceImpl.this.partitions[info.getPartitionId()];
                Address owner = partition.getOwnerOrNull();
                if (owner == null) {
                    InternalPartitionServiceImpl.this.logger.severe("ERROR: partition owner is not set! -> partitionId=" + info.getPartitionId() + " , " + partition + " -VS- " + info);
                    return;
                }
                if (!owner.equals(info.getSource())) {
                    InternalPartitionServiceImpl.this.logger.severe("ERROR: partition owner is not the source of migration! -> partitionId=" + info.getPartitionId() + " , " + partition + " -VS- " + info + " found owner=" + owner);
                    return;
                }
                InternalPartitionServiceImpl.this.sendMigrationEvent(this.migrationInfo, MigrationEvent.MigrationStatus.STARTED);
                MemberImpl fromMember = InternalPartitionServiceImpl.this.getMember(this.migrationInfo.getSource());
                if (InternalPartitionServiceImpl.this.logger.isFinestEnabled()) {
                    InternalPartitionServiceImpl.this.logger.finest("Starting Migration: " + this.migrationInfo);
                }
                if (fromMember == null) {
                    InternalPartitionServiceImpl.this.logger.warning("Partition is lost! Assign new owner and exit... partitionId=" + info.getPartitionId());
                    result = Boolean.TRUE;
                } else {
                    result = this.executeMigrateOperation(migrationRequestOp, fromMember);
                }
                this.processMigrationResult(result);
            }
            catch (Throwable t) {
                Level level = this.migrationInfo.isValid() ? Level.WARNING : Level.FINEST;
                InternalPartitionServiceImpl.this.logger.log(level, "Error [" + t.getClass() + ": " + t.getMessage() + "] while executing " + migrationRequestOp);
                InternalPartitionServiceImpl.this.logger.finest(t);
                this.migrationOperationFailed();
            }
        }

        private void processMigrationResult(Boolean result) {
            if (Boolean.TRUE.equals(result)) {
                if (InternalPartitionServiceImpl.this.logger.isFinestEnabled()) {
                    InternalPartitionServiceImpl.this.logger.finest("Finished Migration: " + this.migrationInfo);
                }
                this.migrationOperationSucceeded();
            } else {
                Level level = this.migrationInfo.isValid() ? Level.WARNING : Level.FINEST;
                InternalPartitionServiceImpl.this.logger.log(level, "Migration failed: " + this.migrationInfo);
                this.migrationOperationFailed();
            }
        }

        private Boolean executeMigrateOperation(MigrationRequestOperation migrationRequestOp, MemberImpl fromMember) {
            InternalCompletableFuture future = InternalPartitionServiceImpl.this.nodeEngine.getOperationService().createInvocationBuilder("hz:core:partitionService", (Operation)migrationRequestOp, this.migrationInfo.getSource()).setCallTimeout(InternalPartitionServiceImpl.this.partitionMigrationTimeout).setTryPauseMillis(1000L).invoke();
            try {
                Object response = future.get();
                return (Boolean)InternalPartitionServiceImpl.this.nodeEngine.toObject(response);
            }
            catch (Throwable e) {
                Level level = InternalPartitionServiceImpl.this.nodeEngine.isRunning() && this.migrationInfo.isValid() ? Level.WARNING : Level.FINEST;
                InternalPartitionServiceImpl.this.logger.log(level, "Failed migration from " + fromMember + " for " + migrationRequestOp.getMigrationInfo(), e);
                return Boolean.FALSE;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void migrationOperationFailed() {
            InternalPartitionServiceImpl.this.lock.lock();
            try {
                InternalPartitionServiceImpl.this.addCompletedMigration(this.migrationInfo);
                InternalPartitionServiceImpl.this.finalizeActiveMigration(this.migrationInfo);
                InternalPartitionServiceImpl.this.publishPartitionRuntimeState();
            }
            finally {
                InternalPartitionServiceImpl.this.lock.unlock();
            }
            InternalPartitionServiceImpl.this.sendMigrationEvent(this.migrationInfo, MigrationEvent.MigrationStatus.FAILED);
            InternalPartitionServiceImpl.this.pauseMigration();
            InternalPartitionServiceImpl.this.migrationQueue.add(new RepartitioningTask());
            InternalPartitionServiceImpl.this.resumeMigrationEventually();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void migrationOperationSucceeded() {
            InternalPartitionServiceImpl.this.lock.lock();
            try {
                int partitionId = this.migrationInfo.getPartitionId();
                InternalPartitionImpl partition = InternalPartitionServiceImpl.this.partitions[partitionId];
                partition.setReplicaAddresses(this.addresses);
                InternalPartitionServiceImpl.this.addCompletedMigration(this.migrationInfo);
                InternalPartitionServiceImpl.this.finalizeActiveMigration(this.migrationInfo);
                InternalPartitionServiceImpl.this.syncPartitionRuntimeState();
            }
            finally {
                InternalPartitionServiceImpl.this.lock.unlock();
            }
            InternalPartitionServiceImpl.this.sendMigrationEvent(this.migrationInfo, MigrationEvent.MigrationStatus.COMPLETED);
        }

        public String toString() {
            return this.getClass().getSimpleName() + "{" + "migrationInfo=" + this.migrationInfo + '}';
        }
    }

    private class RepartitioningTask
    implements Runnable {
        private RepartitioningTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (!InternalPartitionServiceImpl.this.node.isMaster()) {
                return;
            }
            InternalPartitionServiceImpl.this.lock.lock();
            try {
                if (!InternalPartitionServiceImpl.this.initialized) {
                    return;
                }
                if (!this.isMigrationAllowed()) {
                    return;
                }
                InternalPartitionServiceImpl.this.migrationQueue.clear();
                PartitionStateGenerator psg = InternalPartitionServiceImpl.this.partitionStateGenerator;
                Collection memberGroups = InternalPartitionServiceImpl.this.createMemberGroups();
                Address[][] newState = psg.reArrange(memberGroups, InternalPartitionServiceImpl.this.partitions);
                if (newState == null) {
                    if (InternalPartitionServiceImpl.this.logger.isFinestEnabled()) {
                        InternalPartitionServiceImpl.this.logger.finest("partition rearrangement couldn't be done. size of member groups: " + memberGroups.size());
                    }
                    return;
                }
                if (!this.isMigrationAllowed()) {
                    return;
                }
                this.processNewPartitionState(newState);
                InternalPartitionServiceImpl.this.syncPartitionRuntimeState();
            }
            finally {
                InternalPartitionServiceImpl.this.lock.unlock();
            }
        }

        private void processNewPartitionState(Address[][] newState) {
            int migrationCount = 0;
            int lostCount = 0;
            InternalPartitionServiceImpl.this.lastRepartitionTime.set(Clock.currentTimeMillis());
            for (int partitionId = 0; partitionId < InternalPartitionServiceImpl.this.partitionCount; ++partitionId) {
                Object[] replicas = newState[partitionId];
                InternalPartitionImpl currentPartition = InternalPartitionServiceImpl.this.partitions[partitionId];
                Address currentOwner = currentPartition.getOwnerOrNull();
                Address newOwner = replicas[0];
                if (currentOwner == null) {
                    ++lostCount;
                    this.assignNewPartitionOwner(partitionId, (Address[])replicas, currentPartition, newOwner);
                    continue;
                }
                if (newOwner != null && !currentOwner.equals(newOwner)) {
                    if (InternalPartitionServiceImpl.this.logger.isFinestEnabled()) {
                        InternalPartitionServiceImpl.this.logger.finest("PartitionToMigrate partitionId=" + partitionId + " replicas=" + Arrays.toString(replicas) + " currentOwner=" + currentOwner + " newOwner=" + newOwner);
                    }
                    ++migrationCount;
                    this.migratePartitionToNewOwner(partitionId, (Address[])replicas, currentOwner, newOwner);
                    continue;
                }
                currentPartition.setReplicaAddresses((Address[])replicas);
            }
            this.logMigrationStatistics(migrationCount, lostCount);
        }

        private void logMigrationStatistics(int migrationCount, int lostCount) {
            if (lostCount > 0) {
                InternalPartitionServiceImpl.this.logger.warning("Assigning new owners for " + lostCount + " LOST partitions!");
            }
            if (migrationCount > 0) {
                InternalPartitionServiceImpl.this.logger.info("Re-partitioning cluster data... Migration queue size: " + migrationCount);
            } else {
                InternalPartitionServiceImpl.this.logger.info("Partition balance is ok, no need to re-partition cluster data... ");
            }
        }

        private void migratePartitionToNewOwner(int partitionId, Address[] replicas, Address currentOwner, Address newOwner) {
            MigrationInfo info = new MigrationInfo(partitionId, currentOwner, newOwner);
            MigrateTask migrateTask = new MigrateTask(info, replicas);
            InternalPartitionServiceImpl.this.migrationQueue.add(migrateTask);
        }

        private void assignNewPartitionOwner(int partitionId, Address[] replicas, InternalPartitionImpl currentPartition, Address newOwner) {
            currentPartition.setReplicaAddresses(replicas);
            MigrationInfo migrationInfo = new MigrationInfo(partitionId, null, newOwner);
            InternalPartitionServiceImpl.this.sendMigrationEvent(migrationInfo, MigrationEvent.MigrationStatus.STARTED);
            InternalPartitionServiceImpl.this.sendMigrationEvent(migrationInfo, MigrationEvent.MigrationStatus.COMPLETED);
        }

        private boolean isMigrationAllowed() {
            if (InternalPartitionServiceImpl.this.isMigrationAllowed()) {
                return true;
            }
            InternalPartitionServiceImpl.this.migrationQueue.add(this);
            return false;
        }
    }

    private class SyncReplicaVersionTask
    implements Runnable {
        private SyncReplicaVersionTask() {
        }

        @Override
        public void run() {
            if (((InternalPartitionServiceImpl)InternalPartitionServiceImpl.this).node.nodeEngine.isRunning() && InternalPartitionServiceImpl.this.isReplicaSyncAllowed()) {
                for (InternalPartitionImpl partition : InternalPartitionServiceImpl.this.partitions) {
                    if (!partition.isLocal()) continue;
                    for (int index = 1; index < 7; ++index) {
                        if (partition.getReplicaAddress(index) == null) continue;
                        SyncReplicaVersion op = new SyncReplicaVersion(index, null);
                        op.setService(InternalPartitionServiceImpl.this);
                        op.setNodeEngine(InternalPartitionServiceImpl.this.nodeEngine);
                        op.setOperationResponseHandler(OperationResponseHandlerFactory.createErrorLoggingResponseHandler(InternalPartitionServiceImpl.this.node.getLogger(SyncReplicaVersion.class)));
                        op.setPartitionId(partition.getPartitionId());
                        InternalPartitionServiceImpl.this.nodeEngine.getOperationService().executeOperation(op);
                    }
                }
            }
        }
    }

    private class SendPartitionRuntimeStateTask
    implements Runnable {
        private SendPartitionRuntimeStateTask() {
        }

        @Override
        public void run() {
            if (InternalPartitionServiceImpl.this.node.isMaster() && InternalPartitionServiceImpl.this.node.getState() == NodeState.ACTIVE) {
                if (InternalPartitionServiceImpl.this.migrationQueue.isNonEmpty() && InternalPartitionServiceImpl.this.isMigrationAllowed()) {
                    InternalPartitionServiceImpl.this.logger.info("Remaining migration tasks in queue => " + InternalPartitionServiceImpl.this.migrationQueue.size());
                }
                InternalPartitionServiceImpl.this.publishPartitionRuntimeState();
            }
        }
    }
}

