/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.partition;

import com.hazelcast.config.PartitionGroupConfig;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.MigrationEvent;
import com.hazelcast.core.MigrationListener;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.SystemLogService;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.partition.ConfigMemberGroupFactory;
import com.hazelcast.partition.FinalizeMigrationOperation;
import com.hazelcast.partition.HasOngoingMigration;
import com.hazelcast.partition.HostAwareMemberGroupFactory;
import com.hazelcast.partition.MemberGroup;
import com.hazelcast.partition.MemberGroupFactory;
import com.hazelcast.partition.MigrationEndpoint;
import com.hazelcast.partition.MigrationInfo;
import com.hazelcast.partition.MigrationRequestOperation;
import com.hazelcast.partition.PartitionImpl;
import com.hazelcast.partition.PartitionListener;
import com.hazelcast.partition.PartitionReplicaChangeEvent;
import com.hazelcast.partition.PartitionRuntimeState;
import com.hazelcast.partition.PartitionService;
import com.hazelcast.partition.PartitionServiceProxy;
import com.hazelcast.partition.PartitionStateGenerator;
import com.hazelcast.partition.PartitionStateGeneratorImpl;
import com.hazelcast.partition.PartitionStateOperation;
import com.hazelcast.partition.PartitionView;
import com.hazelcast.partition.Partitions;
import com.hazelcast.partition.ReplicaSyncRequest;
import com.hazelcast.partition.SingleMemberGroupFactory;
import com.hazelcast.partition.SyncReplicaVersion;
import com.hazelcast.spi.AbstractOperation;
import com.hazelcast.spi.Callback;
import com.hazelcast.spi.EventPublishingService;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.EventService;
import com.hazelcast.spi.Invocation;
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.MigrationAwareService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.annotation.PrivateApi;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.ResponseHandlerFactory;
import com.hazelcast.util.Clock;
import com.hazelcast.util.scheduler.EntryTaskScheduler;
import com.hazelcast.util.scheduler.EntryTaskSchedulerFactory;
import com.hazelcast.util.scheduler.ScheduledEntry;
import com.hazelcast.util.scheduler.ScheduledEntryProcessor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;

public class PartitionServiceImpl
implements PartitionService,
ManagedService,
EventPublishingService<MigrationEvent, MigrationListener> {
    public static final String SERVICE_NAME = "hz:core:partitionService";
    private static final long REPARTITIONING_CHECK_INTERVAL = TimeUnit.SECONDS.toMillis(600L);
    private final Node node;
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final int partitionCount;
    private final PartitionImpl[] partitions;
    private final PartitionReplicaVersions[] replicaVersions;
    private final ConcurrentMap<Integer, ReplicaSyncInfo> replicaSyncRequests;
    private final EntryTaskScheduler<Integer, ReplicaSyncInfo> replicaSyncScheduler;
    private final AtomicInteger replicaSyncProcessCount = new AtomicInteger();
    private final MigrationThread migrationThread;
    private final long partitionMigrationInterval;
    private final long partitionMigrationTimeout;
    private final PartitionStateGenerator partitionStateGenerator;
    private final MemberGroupFactory memberGroupFactory;
    private final PartitionServiceProxy proxy;
    private final Lock lock = new ReentrantLock();
    private final AtomicInteger stateVersion = new AtomicInteger();
    private final BlockingQueue<Runnable> migrationQueue = new LinkedBlockingQueue<Runnable>();
    private final AtomicBoolean migrationActive = new AtomicBoolean(true);
    private final AtomicLong lastRepartitionTime = new AtomicLong();
    private final SystemLogService systemLogService;
    private volatile int memberGroupsSize;
    private volatile boolean initialized = false;
    private final ConcurrentMap<Integer, MigrationInfo> activeMigrations = new ConcurrentHashMap<Integer, MigrationInfo>(3, 0.75f, 1);
    private final LinkedList<MigrationInfo> completedMigrations = new LinkedList();

    public PartitionServiceImpl(Node node) {
        int i;
        this.partitionCount = node.groupProperties.PARTITION_COUNT.getInteger();
        this.node = node;
        this.nodeEngine = node.nodeEngine;
        this.logger = node.getLogger(PartitionService.class);
        this.systemLogService = node.getSystemLogService();
        this.partitions = new PartitionImpl[this.partitionCount];
        LocalPartitionListener partitionListener = new LocalPartitionListener(node.getThisAddress());
        for (i = 0; i < this.partitionCount; ++i) {
            this.partitions[i] = new PartitionImpl(i, partitionListener);
        }
        this.replicaVersions = new PartitionReplicaVersions[this.partitionCount];
        for (i = 0; i < this.replicaVersions.length; ++i) {
            this.replicaVersions[i] = new PartitionReplicaVersions(i);
        }
        this.memberGroupFactory = PartitionServiceImpl.newMemberGroupFactory(node.getConfig().getPartitionGroupConfig());
        this.partitionStateGenerator = new PartitionStateGeneratorImpl();
        this.partitionMigrationInterval = node.groupProperties.PARTITION_MIGRATION_INTERVAL.getLong() * 1000L;
        this.partitionMigrationTimeout = (long)((float)node.groupProperties.PARTITION_MIGRATION_TIMEOUT.getLong() * 1.5f);
        this.migrationThread = new MigrationThread(node);
        this.proxy = new PartitionServiceProxy(this);
        this.replicaSyncRequests = new ConcurrentHashMap<Integer, ReplicaSyncInfo>(this.partitionCount);
        this.replicaSyncScheduler = EntryTaskSchedulerFactory.newScheduler(this.nodeEngine.getExecutionService().getScheduledExecutor(), new ReplicaSyncEntryProcessor(), false);
        this.nodeEngine.getExecutionService().scheduleWithFixedDelay(new SyncReplicaVersionTask(), 30L, 30L, TimeUnit.SECONDS);
    }

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        this.migrationThread.start();
        int partitionTableSendInterval = this.node.groupProperties.PARTITION_TABLE_SEND_INTERVAL.getInteger();
        if (partitionTableSendInterval <= 0) {
            partitionTableSendInterval = 1;
        }
        nodeEngine.getExecutionService().scheduleAtFixedRate(new SendClusterStateTask(), partitionTableSendInterval, partitionTableSendInterval, TimeUnit.SECONDS);
    }

    @Override
    public Address getPartitionOwner(int partitionId) {
        if (!this.initialized) {
            this.firstArrangement();
        }
        if (this.partitions[partitionId].getOwner() == null && !this.node.isMaster() && this.node.joined()) {
            this.notifyMasterToAssignPartitions();
        }
        return this.partitions[partitionId].getOwner();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyMasterToAssignPartitions() {
        if (this.lock.tryLock()) {
            try {
                if (!this.initialized && !this.node.isMaster() && this.node.getMasterAddress() != null && this.node.joined()) {
                    Future f = this.nodeEngine.getOperationService().createInvocationBuilder(SERVICE_NAME, (Operation)new AssignPartitions(), this.node.getMasterAddress()).setTryCount(1).build().invoke();
                    f.get(1L, TimeUnit.SECONDS);
                }
            }
            catch (Exception e) {
                this.logger.finest(e);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void firstArrangement() {
        if (!this.node.isMaster() || !this.node.isActive()) {
            return;
        }
        if (!this.initialized) {
            this.lock.lock();
            try {
                if (this.initialized) {
                    return;
                }
                PartitionStateGenerator psg = this.partitionStateGenerator;
                this.logger.info("Initializing cluster partition table first arrangement...");
                Collection members = this.node.getClusterService().getMembers();
                PartitionImpl[] newState = psg.initialize(this.memberGroupFactory.createMemberGroups(members), this.partitionCount);
                if (newState != null) {
                    for (PartitionImpl partition : newState) {
                        this.partitions[partition.getPartitionId()].setPartitionInfo(partition);
                    }
                    this.initialized = true;
                    this.sendPartitionRuntimeState();
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private void updateMemberGroupsSize() {
        Collection<MemberGroup> groups = this.memberGroupFactory.createMemberGroups(this.node.getClusterService().getMembers());
        this.memberGroupsSize = groups.size();
    }

    @Override
    public int getMemberGroupsSize() {
        int size = this.memberGroupsSize;
        return size > 0 ? size : 1;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void memberAdded(MemberImpl member) {
        if (!member.localMember()) {
            this.updateMemberGroupsSize();
        }
        if (this.node.isMaster() && this.node.isActive()) {
            this.lock.lock();
            try {
                this.clearMigrationQueue();
                this.migrationQueue.offer(new RepartitioningTask());
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void memberRemoved(MemberImpl member) {
        this.updateMemberGroupsSize();
        Address deadAddress = member.getAddress();
        Address thisAddress = this.node.getThisAddress();
        if (deadAddress == null || deadAddress.equals(thisAddress)) {
            return;
        }
        this.lock.lock();
        try {
            this.clearMigrationQueue();
            if (!this.activeMigrations.isEmpty()) {
                if (this.node.isMaster()) {
                    this.rollbackActiveMigrationsFromPreviousMaster(this.node.getLocalMember().getUuid());
                }
                for (MigrationInfo migrationInfo : this.activeMigrations.values()) {
                    if (!deadAddress.equals(migrationInfo.getSource()) && !deadAddress.equals(migrationInfo.getDestination())) continue;
                    migrationInfo.invalidate();
                }
            }
            this.migrationActive.set(false);
            for (PartitionImpl partition : this.partitions) {
                while (partition.onDeadAddress(deadAddress)) {
                }
            }
            if (this.node.isMaster()) {
                this.migrationQueue.offer(new RepartitioningTask());
            }
            long waitBeforeMigrationActivate = this.node.groupProperties.CONNECTION_MONITOR_INTERVAL.getLong() * (long)this.node.groupProperties.CONNECTION_MONITOR_MAX_FAULTS.getInteger() * 10L;
            this.nodeEngine.getExecutionService().schedule(new Runnable(){

                @Override
                public void run() {
                    PartitionServiceImpl.this.migrationActive.set(true);
                }
            }, waitBeforeMigrationActivate, TimeUnit.MILLISECONDS);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rollbackActiveMigrationsFromPreviousMaster(String currentMasterUuid) {
        this.lock.lock();
        try {
            if (!this.activeMigrations.isEmpty()) {
                for (MigrationInfo migrationInfo : this.activeMigrations.values()) {
                    if (currentMasterUuid.equals(migrationInfo.getMasterUuid())) continue;
                    this.logger.info("Rolling-back migration instantiated by the old master -> " + migrationInfo);
                    this.finalizeActiveMigration(migrationInfo);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendPartitionRuntimeState() {
        if (!this.initialized) {
            return;
        }
        if (!(this.node.isMaster() && this.node.isActive() && this.node.joined())) {
            return;
        }
        if (!this.migrationActive.get()) {
            return;
        }
        Collection<MemberImpl> members = this.node.clusterService.getMemberList();
        this.lock.lock();
        try {
            long clusterTime = this.node.getClusterService().getClusterTime();
            PartitionStateOperation op = new PartitionStateOperation(members, this.getPartitions(), new ArrayList<MigrationInfo>(this.completedMigrations), clusterTime, this.stateVersion.get());
            for (MemberImpl member : members) {
                if (member.localMember()) continue;
                try {
                    this.nodeEngine.getOperationService().send((Operation)op, member.getAddress());
                }
                catch (Exception e) {
                    this.logger.finest(e);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processPartitionRuntimeState(PartitionRuntimeState partitionState) {
        this.lock.lock();
        try {
            PartitionView[] newPartitions;
            if (!this.node.isActive() || !this.node.joined()) {
                return;
            }
            Address sender = partitionState.getEndpoint();
            Address master = this.node.getMasterAddress();
            if (this.node.isMaster()) {
                this.logger.warning("This is the master node and received a PartitionRuntimeState from " + sender + ". Ignoring incoming state! ");
                return;
            }
            if (sender == null || !sender.equals(master)) {
                if (this.node.clusterService.getMember(sender) == null) {
                    this.logger.severe("Received a ClusterRuntimeState from an unknown member! => Sender: " + sender + ", Master: " + master + "! ");
                    return;
                }
                this.logger.warning("Received a ClusterRuntimeState, but its sender doesn't seem master! => Sender: " + sender + ", Master: " + master + "! " + "(Ignore if master node has changed recently.)");
            }
            HashSet<Address> unknownAddresses = new HashSet<Address>();
            for (PartitionView newPartition : newPartitions = partitionState.getPartitions()) {
                PartitionImpl currentPartition = this.partitions[newPartition.getPartitionId()];
                for (int index = 0; index < 7; ++index) {
                    Address address = newPartition.getReplicaAddress(index);
                    if (address == null || this.getMember(address) != null) continue;
                    if (this.logger.isFinestEnabled()) {
                        this.logger.finest("Unknown " + address + " is found in partition table sent from master " + sender + ". Probably it's already left the cluster. Partition: " + newPartition);
                    }
                    unknownAddresses.add(address);
                }
                currentPartition.setOwner(newPartition.getOwner());
            }
            if (!unknownAddresses.isEmpty() && this.logger.isLoggable(Level.WARNING)) {
                StringBuilder s = new StringBuilder("Following unknown addresses are found in partition table").append(" sent from master[").append(sender).append("].").append(" (Probably they have recently joined to or left the cluster.)").append(" {");
                for (Address address : unknownAddresses) {
                    s.append("\n\t").append(address);
                }
                s.append("\n}");
                this.logger.warning(s.toString());
            }
            Collection<MigrationInfo> completedMigrations = partitionState.getCompletedMigrations();
            for (MigrationInfo completedMigration : completedMigrations) {
                this.addCompletedMigration(completedMigration);
                this.finalizeActiveMigration(completedMigration);
            }
            if (!this.activeMigrations.isEmpty()) {
                MemberImpl masterMember = this.getMasterMember();
                this.rollbackActiveMigrationsFromPreviousMaster(masterMember.getUuid());
            }
            for (PartitionView newPartition : newPartitions) {
                PartitionImpl currentPartition = this.partitions[newPartition.getPartitionId()];
                currentPartition.setPartitionInfo(newPartition);
            }
            this.stateVersion.set(partitionState.getVersion());
            this.initialized = true;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void finalizeActiveMigration(final MigrationInfo migrationInfo) {
        block11: {
            if (this.activeMigrations.containsKey(migrationInfo.getPartitionId())) {
                this.lock.lock();
                try {
                    if (!this.activeMigrations.containsValue(migrationInfo)) break block11;
                    if (migrationInfo.startProcessing()) {
                        try {
                            Address thisAddress = this.node.getThisAddress();
                            boolean source = thisAddress.equals(migrationInfo.getSource());
                            boolean destination = thisAddress.equals(migrationInfo.getDestination());
                            if (source || destination) {
                                int partitionId = migrationInfo.getPartitionId();
                                PartitionImpl migratingPartition = this.getPartitionImpl(partitionId);
                                Address ownerAddress = migratingPartition.getOwner();
                                boolean success = migrationInfo.getDestination().equals(ownerAddress);
                                MigrationEndpoint endpoint = source ? MigrationEndpoint.SOURCE : MigrationEndpoint.DESTINATION;
                                FinalizeMigrationOperation op = new FinalizeMigrationOperation(endpoint, success);
                                op.setPartitionId(partitionId).setNodeEngine(this.nodeEngine).setValidateTarget(false).setService(this);
                                this.nodeEngine.getOperationService().executeOperation(op);
                            }
                            break block11;
                        }
                        catch (Exception e) {
                            this.logger.warning(e);
                            break block11;
                        }
                        finally {
                            migrationInfo.doneProcessing();
                        }
                    }
                    this.logger.info("Scheduling finalization of " + migrationInfo + ", because migration process is currently running.");
                    this.nodeEngine.getExecutionService().schedule(new Runnable(){

                        @Override
                        public void run() {
                            PartitionServiceImpl.this.finalizeActiveMigration(migrationInfo);
                        }
                    }, 3L, TimeUnit.SECONDS);
                }
                finally {
                    this.lock.unlock();
                }
            }
        }
    }

    @Override
    public boolean isPartitionMigrating(int partitionId) {
        return this.activeMigrations.containsKey(partitionId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addActiveMigration(MigrationInfo migrationInfo) {
        this.lock.lock();
        try {
            int partitionId = migrationInfo.getPartitionId();
            MigrationInfo currentMigrationInfo = this.activeMigrations.putIfAbsent(partitionId, migrationInfo);
            if (currentMigrationInfo != null) {
                MigrationInfo newMigration;
                MigrationInfo oldMigration;
                boolean oldMaster = false;
                MemberImpl masterMember = this.getMasterMember();
                String master = masterMember.getUuid();
                if (!master.equals(currentMigrationInfo.getMasterUuid())) {
                    oldMigration = currentMigrationInfo;
                    newMigration = migrationInfo;
                    oldMaster = true;
                } else if (!master.equals(migrationInfo.getMasterUuid())) {
                    oldMigration = migrationInfo;
                    newMigration = currentMigrationInfo;
                    oldMaster = true;
                } else if (!currentMigrationInfo.isProcessing() && migrationInfo.isProcessing()) {
                    oldMigration = currentMigrationInfo;
                    newMigration = migrationInfo;
                } else {
                    String message = "Something is seriously wrong! There are two migration requests for the same partition! First-> " + currentMigrationInfo + ", Second -> " + migrationInfo;
                    IllegalStateException error = new IllegalStateException(message);
                    this.logger.severe(message, error);
                    throw error;
                }
                if (oldMaster) {
                    this.logger.info("Finalizing migration instantiated by the old master -> " + oldMigration);
                } else {
                    this.logger.finest("Finalizing previous migration -> " + oldMigration);
                }
                this.finalizeActiveMigration(oldMigration);
                this.activeMigrations.put(partitionId, newMigration);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private MemberImpl getMasterMember() {
        return this.node.clusterService.getMember(this.node.getMasterAddress());
    }

    MigrationInfo getActiveMigration(int partitionId) {
        return (MigrationInfo)this.activeMigrations.get(partitionId);
    }

    MigrationInfo removeActiveMigration(int partitionId) {
        return (MigrationInfo)this.activeMigrations.remove(partitionId);
    }

    public Collection<MigrationInfo> getActiveMigrations() {
        return Collections.unmodifiableCollection(this.activeMigrations.values());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addCompletedMigration(MigrationInfo migrationInfo) {
        this.lock.lock();
        try {
            if (this.completedMigrations.size() > 25) {
                this.completedMigrations.removeFirst();
            }
            this.completedMigrations.add(migrationInfo);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void evictCompletedMigrations() {
        this.lock.lock();
        try {
            if (!this.completedMigrations.isEmpty()) {
                this.completedMigrations.removeFirst();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void clearPartitionReplica(final int partitionId, int replicaIndex) {
        this.nodeEngine.getExecutionService().execute("hz:system", new Runnable(){

            @Override
            public void run() {
                Collection<MigrationAwareService> services = PartitionServiceImpl.this.nodeEngine.getServices(MigrationAwareService.class);
                for (MigrationAwareService service : services) {
                    service.clearPartitionReplica(partitionId);
                }
            }
        });
    }

    @PrivateApi
    void syncPartitionReplica(int partitionId, int replicaIndex, boolean force) {
        if (replicaIndex < 0 || replicaIndex > 7) {
            throw new IllegalArgumentException("Invalid replica index: " + replicaIndex);
        }
        PartitionImpl partitionImpl = this.getPartition(partitionId);
        Address target = partitionImpl.getOwner();
        if (target != null) {
            ReplicaSyncRequest syncRequest = new ReplicaSyncRequest();
            syncRequest.setPartitionId(partitionId).setReplicaIndex(replicaIndex);
            ReplicaSyncInfo currentSyncInfo = (ReplicaSyncInfo)this.replicaSyncRequests.get(partitionId);
            ReplicaSyncInfo syncInfo = new ReplicaSyncInfo(partitionId, replicaIndex, target);
            boolean sendRequest = false;
            if (currentSyncInfo == null) {
                sendRequest = this.replicaSyncRequests.putIfAbsent(partitionId, syncInfo) == null;
            } else if (currentSyncInfo.requestTime < Clock.currentTimeMillis() - 10000L || this.nodeEngine.getClusterService().getMember(currentSyncInfo.target) == null) {
                sendRequest = this.replicaSyncRequests.replace(partitionId, currentSyncInfo, syncInfo);
            } else if (force) {
                this.replicaSyncRequests.put(partitionId, syncInfo);
                sendRequest = true;
            }
            if (target.equals(this.nodeEngine.getThisAddress())) {
                throw new IllegalStateException("Replica target cannot be this node -> partitionId: " + partitionId + ", replicaIndex: " + replicaIndex + ", partition-info: " + partitionImpl);
            }
            if (sendRequest) {
                Level level = Level.FINEST;
                if (this.logger.isLoggable(level)) {
                    this.logger.log(level, "Sending sync replica request to -> " + target + "; for partition: " + partitionId + ", replica: " + replicaIndex);
                }
                this.replicaSyncScheduler.schedule(15000L, partitionId, syncInfo);
                this.nodeEngine.getOperationService().send((Operation)syncRequest, target);
            }
        } else {
            this.logger.warning("Sync replica target is null, no need to sync -> partition: " + partitionId + ", replica: " + replicaIndex);
        }
    }

    @PrivateApi
    public Partitions getPartitions() {
        return new Partitions(this.partitions);
    }

    MemberImpl getMember(Address address) {
        return this.node.clusterService.getMember(address);
    }

    @Override
    public int getStateVersion() {
        return this.stateVersion.get();
    }

    private PartitionImpl getPartitionImpl(int partitionId) {
        return this.partitions[partitionId];
    }

    @Override
    public PartitionImpl getPartition(int partitionId) {
        PartitionImpl p = this.getPartitionImpl(partitionId);
        if (p.getOwner() == null) {
            this.getPartitionOwner(partitionId);
        }
        return p;
    }

    @PrivateApi
    public boolean prepareToSafeShutdown(long timeout, TimeUnit unit) {
        int sleep = 500;
        for (long timeoutInMillis = unit.toMillis(timeout); timeoutInMillis > 0L; timeoutInMillis -= (long)sleep) {
            while (timeoutInMillis > 0L && this.shouldWaitMigrationOrBackups(Level.INFO)) {
                try {
                    Thread.sleep(sleep);
                }
                catch (InterruptedException ignored) {
                    // empty catch block
                }
                timeoutInMillis -= (long)sleep;
            }
            if (timeoutInMillis < 0L) {
                return false;
            }
            if (!this.node.isMaster()) {
                while (timeoutInMillis > 0L && this.hasOnGoingMigrationMaster()) {
                    this.logger.info("Waiting for the master node to complete remaining migrations!");
                    try {
                        Thread.sleep(sleep);
                    }
                    catch (InterruptedException ignored) {
                        // empty catch block
                    }
                    timeoutInMillis -= (long)sleep;
                }
                if (timeoutInMillis < 0L) {
                    return false;
                }
            }
            long start = Clock.currentTimeMillis();
            boolean ok = this.checkReplicaSyncState();
            timeoutInMillis -= Clock.currentTimeMillis() - start;
            if (ok) {
                this.logger.finest("Replica sync state before shutdown is OK");
                return true;
            }
            if (timeoutInMillis < 0L) {
                return false;
            }
            this.logger.info("Backup replica versions inconsistent, waiting for synchronization..");
            try {
                Thread.sleep(sleep);
                continue;
            }
            catch (InterruptedException ignored) {
                // empty catch block
            }
        }
        return false;
    }

    @Override
    public boolean hasOnGoingMigration() {
        return this.hasOnGoingMigrationLocal() || !this.node.isMaster() && this.hasOnGoingMigrationMaster();
    }

    private boolean hasOnGoingMigrationMaster() {
        HasOngoingMigration op = new HasOngoingMigration();
        Invocation inv = this.nodeEngine.getOperationService().createInvocationBuilder(SERVICE_NAME, (Operation)op, this.node.getMasterAddress()).setTryCount(100).setTryPauseMillis(100L).build();
        Future f = inv.invoke();
        try {
            return (Boolean)f.get(1L, TimeUnit.MINUTES);
        }
        catch (InterruptedException ignored) {
        }
        catch (Exception e) {
            this.logger.warning("Could not get a response from master about migrations! -> " + e.toString());
        }
        return false;
    }

    boolean hasOnGoingMigrationLocal() {
        return !this.activeMigrations.isEmpty() || !this.migrationQueue.isEmpty() || this.shouldWaitMigrationOrBackups(Level.OFF);
    }

    private boolean checkReplicaSyncState() {
        Address thisAddress = this.node.getThisAddress();
        final Semaphore s = new Semaphore(0);
        final AtomicBoolean ok = new AtomicBoolean(true);
        int empty = 0;
        for (PartitionImpl partition : this.partitions) {
            if (thisAddress.equals(partition.getOwner()) && partition.getReplicaAddress(1) != null) {
                Callback<Object> callback = new Callback<Object>(){

                    @Override
                    public void notify(Object object) {
                        if (Boolean.FALSE.equals(object)) {
                            ok.compareAndSet(true, false);
                        }
                        s.release();
                    }
                };
                SyncReplicaVersion op = new SyncReplicaVersion(1, callback);
                op.setService(this);
                op.setNodeEngine(this.nodeEngine);
                op.setResponseHandler(ResponseHandlerFactory.createErrorLoggingResponseHandler(this.node.getLogger(SyncReplicaVersion.class)));
                op.setPartitionId(partition.getPartitionId());
                this.nodeEngine.getOperationService().executeOperation(op);
                continue;
            }
            ++empty;
        }
        s.release(empty);
        try {
            return s.tryAcquire(this.partitionCount, 10L, TimeUnit.SECONDS) && ok.get();
        }
        catch (InterruptedException ignored) {
            return false;
        }
    }

    private boolean shouldWaitMigrationOrBackups(Level level) {
        if (this.initialized) {
            MemberGroupFactory mgf = this.memberGroupFactory;
            Collection<MemberGroup> memberGroups = mgf.createMemberGroups(this.node.getClusterService().getMembers());
            if (memberGroups.size() < 2) {
                return false;
            }
            int groups = 0;
            for (MemberGroup memberGroup : memberGroups) {
                if (memberGroup.size() <= 0) continue;
                ++groups;
            }
            if (groups < 2) {
                return false;
            }
            int activeSize = this.activeMigrations.size();
            if (activeSize != 0) {
                if (this.logger.isLoggable(level)) {
                    this.logger.log(level, "Waiting for active migration tasks: " + activeSize);
                }
                return true;
            }
            int queueSize = this.migrationQueue.size();
            if (queueSize == 0) {
                for (PartitionImpl partition : this.partitions) {
                    if (partition.getReplicaAddress(1) == null) {
                        if (this.logger.isLoggable(level)) {
                            this.logger.log(level, "Should take backup of partition: " + partition.getPartitionId());
                        }
                        return true;
                    }
                    int replicaSyncProcesses = this.replicaSyncProcessCount.get();
                    if (replicaSyncProcesses <= 0) continue;
                    if (this.logger.isLoggable(level)) {
                        this.logger.log(level, "Processing replica sync requests: " + replicaSyncProcesses);
                    }
                    return true;
                }
            } else {
                if (this.logger.isLoggable(level)) {
                    this.logger.log(level, "Waiting for cluster migration tasks: " + queueSize);
                }
                return true;
            }
        }
        return false;
    }

    @Override
    public final int getPartitionId(Data key) {
        int hash = key.getPartitionHash();
        return hash != Integer.MIN_VALUE ? Math.abs(hash) % this.partitionCount : 0;
    }

    @Override
    public final int getPartitionId(Object key) {
        return this.getPartitionId(this.nodeEngine.toData(key));
    }

    @Override
    public final int getPartitionCount() {
        return this.partitionCount;
    }

    @PrivateApi
    public long[] incrementPartitionReplicaVersions(int partitionId, int backupCount) {
        return this.replicaVersions[partitionId].incrementAndGet(backupCount);
    }

    @PrivateApi
    public void updatePartitionReplicaVersions(int partitionId, long[] versions, int replicaIndex) {
        PartitionReplicaVersions partitionVersion = this.replicaVersions[partitionId];
        if (!partitionVersion.update(versions, replicaIndex)) {
            this.syncPartitionReplica(partitionId, replicaIndex, false);
        }
    }

    long[] getPartitionReplicaVersions(int partitionId) {
        return this.replicaVersions[partitionId].get();
    }

    void setPartitionReplicaVersions(int partitionId, long[] versions) {
        this.replicaVersions[partitionId].reset(versions);
    }

    void finalizeReplicaSync(int partitionId, long[] versions) {
        this.setPartitionReplicaVersions(partitionId, versions);
        this.replicaSyncRequests.remove(partitionId);
        this.replicaSyncScheduler.cancel(partitionId);
    }

    void incrementReplicaSyncProcessCount() {
        this.replicaSyncProcessCount.incrementAndGet();
    }

    void decrementReplicaSyncProcessCount() {
        this.replicaSyncProcessCount.decrementAndGet();
    }

    @Override
    public Map<Address, List<Integer>> getMemberPartitionsMap() {
        int members = this.node.getClusterService().getSize();
        HashMap<Address, List<Integer>> memberPartitions = new HashMap<Address, List<Integer>>(members);
        for (int i = 0; i < this.partitionCount; ++i) {
            Address owner;
            while ((owner = this.getPartitionOwner(i)) == null) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException e) {
                    throw new HazelcastException(e);
                }
            }
            ArrayList<Integer> ownedPartitions = (ArrayList<Integer>)memberPartitions.get(owner);
            if (ownedPartitions == null) {
                ownedPartitions = new ArrayList<Integer>();
                memberPartitions.put(owner, ownedPartitions);
            }
            ownedPartitions.add(i);
        }
        return memberPartitions;
    }

    @Override
    public List<Integer> getMemberPartitions(Address target) {
        LinkedList<Integer> ownedPartitions = new LinkedList<Integer>();
        for (int i = 0; i < this.partitionCount; ++i) {
            Address owner = this.getPartitionOwner(i);
            if (!target.equals(owner)) continue;
            ownedPartitions.add(i);
        }
        return ownedPartitions;
    }

    private boolean shouldCheckRepartitioning() {
        return this.migrationQueue.isEmpty() && this.lastRepartitionTime.get() < Clock.currentTimeMillis() - REPARTITIONING_CHECK_INTERVAL;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reset() {
        this.clearMigrationQueue();
        this.replicaSyncRequests.clear();
        this.replicaSyncScheduler.cancelAll();
        this.lock.lock();
        try {
            this.initialized = false;
            for (PartitionImpl partition : this.partitions) {
                for (int i = 0; i < 7; ++i) {
                    partition.setReplicaAddress(i, null);
                }
            }
            this.activeMigrations.clear();
            this.completedMigrations.clear();
            this.stateVersion.set(0);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void shutdown() {
        this.logger.finest("Shutting down the partition service");
        this.migrationThread.stopNow();
        this.reset();
    }

    private void clearMigrationQueue() {
        this.migrationQueue.clear();
    }

    public long getMigrationQueueSize() {
        return this.migrationQueue.size();
    }

    public PartitionServiceProxy getPartitionServiceProxy() {
        return this.proxy;
    }

    private void sendMigrationEvent(MigrationInfo migrationInfo, MigrationEvent.MigrationStatus status) {
        MemberImpl current = this.getMember(migrationInfo.getSource());
        MemberImpl newOwner = this.getMember(migrationInfo.getDestination());
        MigrationEvent event = new MigrationEvent(migrationInfo.getPartitionId(), current, newOwner, status);
        EventService eventService = this.nodeEngine.getEventService();
        Collection<EventRegistration> registrations = eventService.getRegistrations(SERVICE_NAME, SERVICE_NAME);
        eventService.publishEvent(SERVICE_NAME, registrations, (Object)event, event.getPartitionId());
    }

    @Override
    public String addMigrationListener(MigrationListener migrationListener) {
        EventRegistration registration = this.nodeEngine.getEventService().registerListener(SERVICE_NAME, SERVICE_NAME, migrationListener);
        return registration.getId();
    }

    @Override
    public boolean removeMigrationListener(String registrationId) {
        return this.nodeEngine.getEventService().deregisterListener(SERVICE_NAME, SERVICE_NAME, registrationId);
    }

    @Override
    public void dispatchEvent(MigrationEvent migrationEvent, MigrationListener migrationListener) {
        switch (migrationEvent.getStatus()) {
            case STARTED: {
                migrationListener.migrationStarted(migrationEvent);
                break;
            }
            case COMPLETED: {
                migrationListener.migrationCompleted(migrationEvent);
                break;
            }
            case FAILED: {
                migrationListener.migrationFailed(migrationEvent);
            }
        }
    }

    private static MemberGroupFactory newMemberGroupFactory(PartitionGroupConfig partitionGroupConfig) {
        if (partitionGroupConfig == null || !partitionGroupConfig.isEnabled()) {
            return new SingleMemberGroupFactory();
        }
        switch (partitionGroupConfig.getGroupType()) {
            case HOST_AWARE: {
                return new HostAwareMemberGroupFactory();
            }
            case CUSTOM: {
                return new ConfigMemberGroupFactory(partitionGroupConfig.getMemberGroupConfigs());
            }
        }
        return new SingleMemberGroupFactory();
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("PartitionManager[" + this.stateVersion + "] {\n");
        sb.append("\n");
        sb.append("migrationQ: ").append(this.migrationQueue.size());
        sb.append("\n}");
        return sb.toString();
    }

    private class ReplicaSyncInfo {
        final int partitionId;
        final int replicaIndex;
        final long requestTime = Clock.currentTimeMillis();
        final Address target;

        private ReplicaSyncInfo(int partitionId, int replicaIndex, Address target) {
            this.partitionId = partitionId;
            this.replicaIndex = replicaIndex;
            this.target = target;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ReplicaSyncInfo that = (ReplicaSyncInfo)o;
            if (this.partitionId != that.partitionId) {
                return false;
            }
            return this.replicaIndex == that.replicaIndex;
        }

        public int hashCode() {
            int result = this.partitionId;
            result = 31 * result + this.replicaIndex;
            return result;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("ReplicaSyncInfo{");
            sb.append("partitionId=").append(this.partitionId);
            sb.append(", replicaIndex=").append(this.replicaIndex);
            sb.append(", requestTime=").append(this.requestTime);
            sb.append(", target=").append(this.target);
            sb.append('}');
            return sb.toString();
        }
    }

    private class MigrationThread
    implements Runnable {
        private final Thread thread;
        private final long sleepTime;
        private boolean migrating;

        MigrationThread(Node node) {
            this.sleepTime = Math.max(250L, PartitionServiceImpl.this.partitionMigrationInterval);
            this.migrating = false;
            this.thread = new Thread(node.threadGroup, this, node.getThreadNamePrefix("migration"));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (!this.thread.isInterrupted()) {
                    Runnable r;
                    while (PartitionServiceImpl.this.migrationActive.get() && (r = (Runnable)PartitionServiceImpl.this.migrationQueue.poll(1L, TimeUnit.SECONDS)) != null) {
                        this.safeRun(r);
                        if (PartitionServiceImpl.this.partitionMigrationInterval <= 0L) continue;
                        Thread.sleep(PartitionServiceImpl.this.partitionMigrationInterval);
                    }
                    boolean hasNoTasks = PartitionServiceImpl.this.migrationQueue.isEmpty();
                    if (hasNoTasks && this.migrating) {
                        this.migrating = false;
                        PartitionServiceImpl.this.logger.info("All migration tasks has been completed, queues are empty.");
                    }
                    if (PartitionServiceImpl.this.migrationActive.get() && !hasNoTasks) continue;
                    PartitionServiceImpl.this.evictCompletedMigrations();
                    Thread.sleep(this.sleepTime);
                }
            }
            catch (InterruptedException e) {
                PartitionServiceImpl.this.logger.finest("MigrationThread is interrupted: " + e.getMessage());
            }
            finally {
                PartitionServiceImpl.this.clearMigrationQueue();
            }
        }

        boolean safeRun(Runnable r) {
            if (r == null || this.thread.isInterrupted()) {
                return false;
            }
            try {
                this.migrating = r instanceof Migrator;
                r.run();
            }
            catch (Throwable t) {
                PartitionServiceImpl.this.logger.warning(t);
            }
            return true;
        }

        void start() {
            this.thread.start();
        }

        void stopNow() {
            PartitionServiceImpl.this.clearMigrationQueue();
            this.thread.interrupt();
        }
    }

    private class Migrator
    implements Runnable {
        final MigrationInfo migrationInfo;
        final BackupMigrationTask backupTask;

        Migrator(MigrationInfo migrationInfo, BackupMigrationTask backupTask) {
            this.migrationInfo = migrationInfo;
            this.backupTask = backupTask;
            MemberImpl masterMember = PartitionServiceImpl.this.getMasterMember();
            if (masterMember != null) {
                migrationInfo.setMasterUuid(masterMember.getUuid());
                migrationInfo.setMaster(masterMember.getAddress());
            }
        }

        @Override
        public void run() {
            if (!PartitionServiceImpl.this.node.isActive() || !PartitionServiceImpl.this.node.isMaster()) {
                return;
            }
            MigrationRequestOperation migrationRequestOp = new MigrationRequestOperation(this.migrationInfo);
            try {
                MigrationInfo info = this.migrationInfo;
                PartitionImpl partition = PartitionServiceImpl.this.partitions[info.getPartitionId()];
                if (!partition.getOwner().equals(info.getSource())) {
                    PartitionServiceImpl.this.logger.severe("ERROR: partition owner is not the source of migration! -> " + partition + " -VS- " + info);
                }
                PartitionServiceImpl.this.sendMigrationEvent(this.migrationInfo, MigrationEvent.MigrationStatus.STARTED);
                Boolean result = Boolean.FALSE;
                MemberImpl fromMember = PartitionServiceImpl.this.getMember(this.migrationInfo.getSource());
                PartitionServiceImpl.this.logger.finest("Started Migration : " + this.migrationInfo);
                PartitionServiceImpl.this.systemLogService.logPartition("Started Migration : " + this.migrationInfo);
                if (fromMember != null) {
                    Invocation inv = PartitionServiceImpl.this.nodeEngine.getOperationService().createInvocationBuilder(PartitionServiceImpl.SERVICE_NAME, (Operation)migrationRequestOp, this.migrationInfo.getSource()).setTryPauseMillis(1000L).build();
                    Future future = inv.invoke();
                    try {
                        result = (Boolean)PartitionServiceImpl.this.nodeEngine.toObject(future.get(PartitionServiceImpl.this.partitionMigrationTimeout, TimeUnit.SECONDS));
                    }
                    catch (Throwable e) {
                        Level level = PartitionServiceImpl.this.node.isActive() && this.migrationInfo.isValid() ? Level.WARNING : Level.FINEST;
                        PartitionServiceImpl.this.logger.log(level, "Failed migrating from " + fromMember, e);
                    }
                } else {
                    PartitionServiceImpl.this.logger.warning("Partition is lost! Assign new owner and exit...");
                    result = Boolean.TRUE;
                }
                if (Boolean.TRUE.equals(result)) {
                    PartitionServiceImpl.this.logger.finest("Finished Migration: " + this.migrationInfo);
                    PartitionServiceImpl.this.systemLogService.logPartition("Finished Migration: " + this.migrationInfo);
                    this.processMigrationResult();
                } else {
                    Level level = this.migrationInfo.isValid() ? Level.WARNING : Level.FINEST;
                    PartitionServiceImpl.this.logger.log(level, "Migration failed: " + this.migrationInfo);
                    this.migrationTaskFailed();
                }
            }
            catch (Throwable t) {
                Level level = this.migrationInfo.isValid() ? Level.WARNING : Level.FINEST;
                PartitionServiceImpl.this.logger.log(level, "Error [" + t.getClass() + ": " + t.getMessage() + "] while executing " + migrationRequestOp);
                PartitionServiceImpl.this.logger.finest(t);
                this.migrationTaskFailed();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void migrationTaskFailed() {
            PartitionServiceImpl.this.systemLogService.logPartition("Migration failed: " + this.migrationInfo);
            PartitionServiceImpl.this.lock.lock();
            try {
                PartitionServiceImpl.this.addCompletedMigration(this.migrationInfo);
                PartitionServiceImpl.this.finalizeActiveMigration(this.migrationInfo);
                PartitionServiceImpl.this.sendPartitionRuntimeState();
            }
            finally {
                PartitionServiceImpl.this.lock.unlock();
            }
            PartitionServiceImpl.this.sendMigrationEvent(this.migrationInfo, MigrationEvent.MigrationStatus.FAILED);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processMigrationResult() {
            PartitionServiceImpl.this.lock.lock();
            try {
                int partitionId = this.migrationInfo.getPartitionId();
                Address newOwner = this.migrationInfo.getDestination();
                PartitionImpl partition = PartitionServiceImpl.this.partitions[partitionId];
                partition.setOwner(newOwner);
                PartitionServiceImpl.this.addCompletedMigration(this.migrationInfo);
                PartitionServiceImpl.this.finalizeActiveMigration(this.migrationInfo);
                if (this.backupTask != null) {
                    this.backupTask.run();
                }
                PartitionServiceImpl.this.sendPartitionRuntimeState();
            }
            finally {
                PartitionServiceImpl.this.lock.unlock();
            }
            PartitionServiceImpl.this.sendMigrationEvent(this.migrationInfo, MigrationEvent.MigrationStatus.COMPLETED);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("Migrator{");
            sb.append("migrationInfo=").append(this.migrationInfo);
            sb.append('}');
            return sb.toString();
        }
    }

    private class BackupMigrationTask
    implements Runnable {
        final PartitionImpl newPartition;

        BackupMigrationTask(PartitionImpl newPartition) {
            this.newPartition = newPartition;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            PartitionServiceImpl.this.lock.lock();
            try {
                PartitionImpl currentPartition = PartitionServiceImpl.this.partitions[this.newPartition.getPartitionId()];
                for (int index = 1; index < 7; ++index) {
                    currentPartition.setReplicaAddress(index, this.newPartition.getReplicaAddress(index));
                }
            }
            finally {
                PartitionServiceImpl.this.lock.unlock();
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("BackupMigrationTask{");
            sb.append("newPartition=").append(this.newPartition);
            sb.append('}');
            return sb.toString();
        }
    }

    private class RepartitioningTask
    implements Runnable {
        private RepartitioningTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (PartitionServiceImpl.this.node.isMaster() && PartitionServiceImpl.this.node.isActive()) {
                PartitionStateGenerator psg = PartitionServiceImpl.this.partitionStateGenerator;
                Collection members = PartitionServiceImpl.this.node.getClusterService().getMembers();
                PartitionServiceImpl.this.lock.lock();
                try {
                    if (!PartitionServiceImpl.this.initialized) {
                        return;
                    }
                    PartitionImpl[] newState = psg.reArrange(PartitionServiceImpl.this.memberGroupFactory.createMemberGroups(members), PartitionServiceImpl.this.partitions);
                    int migrationCount = 0;
                    int lostCount = 0;
                    PartitionServiceImpl.this.lastRepartitionTime.set(Clock.currentTimeMillis());
                    for (PartitionImpl newPartition : newState) {
                        int partitionId = newPartition.getPartitionId();
                        PartitionImpl currentPartition = PartitionServiceImpl.this.partitions[partitionId];
                        Address currentOwner = currentPartition.getOwner();
                        Address newOwner = newPartition.getOwner();
                        if (currentOwner == null) {
                            ++lostCount;
                            currentPartition.setPartitionInfo(newPartition);
                            MigrationInfo migrationInfo = new MigrationInfo(partitionId, null, newOwner);
                            PartitionServiceImpl.this.sendMigrationEvent(migrationInfo, MigrationEvent.MigrationStatus.STARTED);
                            PartitionServiceImpl.this.sendMigrationEvent(migrationInfo, MigrationEvent.MigrationStatus.COMPLETED);
                            continue;
                        }
                        if (newOwner != null && !currentOwner.equals(newOwner)) {
                            ++migrationCount;
                            MigrationInfo info = new MigrationInfo(partitionId, currentOwner, newOwner);
                            Migrator migrator = new Migrator(info, new BackupMigrationTask(newPartition));
                            PartitionServiceImpl.this.migrationQueue.offer(migrator);
                            continue;
                        }
                        currentPartition.setPartitionInfo(newPartition);
                    }
                    PartitionServiceImpl.this.sendPartitionRuntimeState();
                    if (lostCount > 0) {
                        PartitionServiceImpl.this.logger.warning("Assigning new owners for " + lostCount + " LOST partitions!");
                    }
                    if (migrationCount > 0) {
                        PartitionServiceImpl.this.logger.info("Re-partitioning cluster data... Migration queue size: " + migrationCount);
                    } else {
                        PartitionServiceImpl.this.logger.info("Partition balance is ok, no need to re-partition cluster data... ");
                    }
                }
                finally {
                    PartitionServiceImpl.this.lock.unlock();
                }
            }
        }
    }

    private class SyncReplicaVersionTask
    implements Runnable {
        private SyncReplicaVersionTask() {
        }

        @Override
        public void run() {
            if (PartitionServiceImpl.this.node.isActive() && PartitionServiceImpl.this.migrationActive.get()) {
                Address thisAddress = PartitionServiceImpl.this.node.getThisAddress();
                for (PartitionImpl partition : PartitionServiceImpl.this.partitions) {
                    if (!thisAddress.equals(partition.getOwner()) || partition.getReplicaAddress(1) == null) continue;
                    SyncReplicaVersion op = new SyncReplicaVersion(1, null);
                    op.setService(PartitionServiceImpl.this);
                    op.setNodeEngine(PartitionServiceImpl.this.nodeEngine);
                    op.setResponseHandler(ResponseHandlerFactory.createErrorLoggingResponseHandler(PartitionServiceImpl.this.node.getLogger(SyncReplicaVersion.class)));
                    op.setPartitionId(partition.getPartitionId());
                    PartitionServiceImpl.this.nodeEngine.getOperationService().executeOperation(op);
                }
            }
        }
    }

    private class SendClusterStateTask
    implements Runnable {
        private SendClusterStateTask() {
        }

        @Override
        public void run() {
            if (PartitionServiceImpl.this.node.isMaster() && PartitionServiceImpl.this.node.isActive()) {
                if (!PartitionServiceImpl.this.migrationQueue.isEmpty() && PartitionServiceImpl.this.migrationActive.get()) {
                    PartitionServiceImpl.this.logger.info("Remaining migration tasks in queue => " + PartitionServiceImpl.this.migrationQueue.size());
                }
                PartitionServiceImpl.this.sendPartitionRuntimeState();
            }
        }
    }

    public static class AssignPartitions
    extends AbstractOperation {
        @Override
        public void run() {
            PartitionServiceImpl service = (PartitionServiceImpl)this.getService();
            service.firstArrangement();
        }

        @Override
        public boolean returnsResponse() {
            return true;
        }

        @Override
        public Object getResponse() {
            return Boolean.TRUE;
        }
    }

    private class PartitionReplicaVersions {
        final int partitionId;
        final long[] versions = new long[6];

        private PartitionReplicaVersions(int partitionId) {
            this.partitionId = partitionId;
        }

        long[] incrementAndGet(int backupCount) {
            int i = 0;
            while (i < backupCount) {
                int n = i++;
                this.versions[n] = this.versions[n] + 1L;
            }
            return this.versions;
        }

        long[] get() {
            return this.versions;
        }

        boolean update(long[] newVersions, int currentReplica) {
            boolean updated;
            int index = currentReplica - 1;
            long current = this.versions[index];
            long next = newVersions[index];
            boolean bl = updated = current == next - 1L;
            if (updated) {
                System.arraycopy(newVersions, 0, this.versions, 0, newVersions.length);
            }
            return updated;
        }

        void reset(long[] newVersions) {
            System.arraycopy(newVersions, 0, this.versions, 0, newVersions.length);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("PartitionReplicaVersions");
            sb.append("{partitionId=").append(this.partitionId);
            sb.append(", versions=").append(Arrays.toString(this.versions));
            sb.append('}');
            return sb.toString();
        }
    }

    private class ReplicaSyncEntryProcessor
    implements ScheduledEntryProcessor<Integer, ReplicaSyncInfo> {
        private ReplicaSyncEntryProcessor() {
        }

        @Override
        public void process(EntryTaskScheduler<Integer, ReplicaSyncInfo> scheduler, Collection<ScheduledEntry<Integer, ReplicaSyncInfo>> entries) {
            for (ScheduledEntry<Integer, ReplicaSyncInfo> entry : entries) {
                ReplicaSyncInfo syncInfo = entry.getValue();
                if (!PartitionServiceImpl.this.replicaSyncRequests.remove(entry.getKey(), syncInfo)) continue;
                PartitionServiceImpl.this.logger.info("Re-sending sync replica request for partition: " + syncInfo.partitionId + ", replica: " + syncInfo.replicaIndex);
                PartitionServiceImpl.this.syncPartitionReplica(syncInfo.partitionId, syncInfo.replicaIndex, false);
            }
        }
    }

    private class LocalPartitionListener
    implements PartitionListener {
        final Address thisAddress;

        private LocalPartitionListener(Address thisAddress) {
            this.thisAddress = thisAddress;
        }

        @Override
        public void replicaChanged(PartitionReplicaChangeEvent event) {
            if (event.getReplicaIndex() > 0) {
                if (this.thisAddress.equals(event.getOldAddress())) {
                    PartitionImpl partition = PartitionServiceImpl.this.partitions[event.getPartitionId()];
                    if (!partition.isOwnerOrBackup(this.thisAddress)) {
                        PartitionServiceImpl.this.clearPartitionReplica(event.getPartitionId(), event.getReplicaIndex());
                    }
                } else if (this.thisAddress.equals(event.getNewAddress())) {
                    PartitionServiceImpl.this.syncPartitionReplica(event.getPartitionId(), event.getReplicaIndex(), true);
                }
            }
            if (event.getReplicaIndex() == 0 && event.getNewAddress() == null && PartitionServiceImpl.this.node.isActive() && PartitionServiceImpl.this.node.joined()) {
                String warning = "Owner of partition is being removed! Possible data loss for partition[" + event.getPartitionId() + "]. " + event;
                PartitionServiceImpl.this.logger.warning(warning);
                PartitionServiceImpl.this.systemLogService.logWarningPartition(warning);
            }
            if (PartitionServiceImpl.this.node.isMaster()) {
                PartitionServiceImpl.this.stateVersion.incrementAndGet();
            }
        }
    }
}

