/*
 * Decompiled with CFR 0.152.
 */
package com.staros.shard;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.staros.exception.CheckInterruptedException;
import com.staros.exception.StarException;
import com.staros.proto.PlacementPolicy;
import com.staros.replica.Replica;
import com.staros.schedule.ReplicaWorkerInvertIndex;
import com.staros.schedule.ScheduleScorer;
import com.staros.schedule.Scheduler;
import com.staros.schedule.select.FirstNSelector;
import com.staros.service.ServiceManager;
import com.staros.shard.Shard;
import com.staros.shard.ShardGroup;
import com.staros.shard.ShardManager;
import com.staros.util.AbstractServer;
import com.staros.util.Config;
import com.staros.worker.Worker;
import com.staros.worker.WorkerGroup;
import com.staros.worker.WorkerManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SplittableRandom;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ShardChecker
extends AbstractServer {
    private static final Logger LOG = LogManager.getLogger(ShardChecker.class);
    private final ServiceManager serviceManager;
    private final WorkerManager workerManager;
    private final Scheduler scheduler;
    private final Thread checkThread;
    private final FirstNSelector selector;
    private final long coolDownMs = 60000L;
    private AtomicLong allows = new AtomicLong();
    private Map<Long, WorkerGroupStats> workerGroupStatsMap;
    private static final long WORKER_GROUP_STAT_REFRESH_INTERVAL_MS = 5000L;

    public ShardChecker(ServiceManager serviceManager, WorkerManager workerManager, Scheduler scheduler) {
        this.serviceManager = serviceManager;
        this.workerManager = workerManager;
        this.scheduler = scheduler;
        this.selector = new FirstNSelector();
        this.checkThread = new Thread(this::runCheckThread);
    }

    @Override
    public void doStart() {
        this.checkThread.start();
    }

    @Override
    public void doStop() {
        try {
            this.checkThread.interrupt();
            this.checkThread.join();
        }
        catch (InterruptedException e) {
            LOG.warn("join shard checker thread failed! {}", (Object)e.getMessage());
        }
    }

    private void sampleLogging(Level level, String message) {
        long allowed;
        long now = System.currentTimeMillis();
        if (now > (allowed = this.allows.get()) && this.allows.compareAndSet(allowed, now + 60000L)) {
            LOG.log(level, message);
        }
    }

    private void runCheckThread() {
        while (this.isRunning()) {
            if (Config.DISABLE_BACKGROUND_SHARD_SCHEDULE_CHECK) {
                this.sampleLogging(Level.WARN, "DISABLE_BACKGROUND_SHARD_SCHEDULE_CHECK is turned on. Disabling balance shards ability!");
            } else {
                try {
                    LOG.debug("running shard check once.");
                    for (String serviceId : this.serviceManager.getServiceIdSet()) {
                        this.shardHealthCheckForService(serviceId);
                    }
                    for (String serviceId : this.serviceManager.getServiceIdSet()) {
                        this.shardGroupBalanceCheckForService(serviceId);
                    }
                    this.workerGroupStatsMap = null;
                }
                catch (CheckInterruptedException exception) {
                    LOG.info("Check interrupted: {}", (Object)exception.getMessage());
                }
            }
            try {
                Thread.sleep((long)Config.SHARD_CHECKER_LOOP_INTERVAL_SEC * 1000L);
            }
            catch (InterruptedException e) {
                LOG.info("shard checker thread interrupted! {}", (Object)e.getMessage());
            }
        }
    }

    protected void shardHealthCheckForService(String serviceId) {
        LOG.debug("Start shard replica health check for service: {}", (Object)serviceId);
        ShardManager shardManager = this.serviceManager.getShardManager(serviceId);
        if (shardManager == null) {
            LOG.info("ShardManager not exist for service {}, skip the healthy check.", (Object)serviceId);
            return;
        }
        Iterator<Map.Entry<Long, Shard>> iterator = shardManager.getShardIterator();
        try {
            while (iterator.hasNext()) {
                Map.Entry<Long, Shard> entry = iterator.next();
                long shardId = entry.getKey();
                Shard shard = shardManager.getShard(shardId);
                if (shard == null) continue;
                try {
                    this.shardHealthCheck(shard);
                }
                catch (StarException exception) {
                    LOG.info("Got exception during processing service:{}, shard:{} health check, skip it.", (Object)serviceId, (Object)shardId, (Object)exception);
                }
                if (!Config.DISABLE_BACKGROUND_SHARD_SCHEDULE_CHECK) continue;
                throw new CheckInterruptedException("Interrupted shard health check.");
            }
        }
        catch (ConcurrentModificationException exception) {
            LOG.info("Fail to check shards due to ConcurrentModificationException. Error: {}", (Object)exception.getMessage());
        }
    }

    protected void shardHealthCheck(Shard shard) throws StarException {
        ShardManager shardManager = this.serviceManager.getShardManager(shard.getServiceId());
        ImmutableList<Replica> replicas = shard.getReplica();
        HashMap<Long, ReplicaStat> replicaStatsMap = new HashMap<Long, ReplicaStat>();
        replicaStatsMap.put(0L, new ReplicaStat());
        ArrayList<Long> expiredReplicas = new ArrayList<Long>();
        for (Replica replica : replicas) {
            long workerId = replica.getWorkerId();
            Worker w = this.workerManager.getWorker(workerId);
            if (w == null) {
                expiredReplicas.add(workerId);
                continue;
            }
            replicaStatsMap.computeIfAbsent(w.getGroupId(), k -> new ReplicaStat());
            ReplicaStat stat = (ReplicaStat)replicaStatsMap.get(w.getGroupId());
            if (w.isShutdown()) {
                ++stat.shutdownReplicas;
                continue;
            }
            if (!w.isAlive()) {
                if (w.replicaExpired()) {
                    expiredReplicas.add(workerId);
                    continue;
                }
                ++stat.deadReplica;
                continue;
            }
            switch (replica.getState()) {
                case REPLICA_OK: {
                    ++stat.okReplicas;
                    break;
                }
                case REPLICA_SCALE_IN: {
                    ++stat.scaleinReplicas;
                    break;
                }
                case REPLICA_SCALE_OUT: {
                    if (!w.warmupEnabled() || replica.stateTimedOut(1000L * (long)Config.SHARD_REPLICA_SCALE_OUT_TIMEOUT_SECS)) {
                        ArrayList<Long> shardIds = new ArrayList<Long>(1);
                        shardIds.add(shard.getShardId());
                        shardManager.scaleOutShardReplicasDone(shardIds, replica.getWorkerId());
                        ++stat.okReplicas;
                        break;
                    }
                    ++stat.scaleoutReplicas;
                }
            }
        }
        String serviceId = shard.getServiceId();
        long shardId = shard.getShardId();
        if (!expiredReplicas.isEmpty()) {
            Iterator<Object> iterator = expiredReplicas.iterator();
            while (iterator.hasNext()) {
                long workerId = (Long)iterator.next();
                try {
                    this.scheduler.scheduleRemoveFromWorker(shard.getServiceId(), shardId, workerId);
                }
                catch (StarException exception) {
                    LOG.debug("Fail to remove shard replica for service:{}, shard:{}, worker:{}, error:", (Object)serviceId, (Object)shardId, (Object)workerId, (Object)exception);
                }
            }
        }
        for (Map.Entry entry : replicaStatsMap.entrySet()) {
            long workerGroupId = (Long)entry.getKey();
            ReplicaStat stat = (ReplicaStat)entry.getValue();
            WorkerGroup wg = this.workerManager.getWorkerGroupNoException(serviceId, workerGroupId);
            if (wg == null) continue;
            long expectedNum = wg.getReplicaNumber();
            try {
                long liveReplicas = stat.okReplicas + stat.scaleoutReplicas;
                if (liveReplicas == 0L || liveReplicas + (long)stat.deadReplica < expectedNum) {
                    LOG.debug("Request schedule new replicas for service:{}, shard:{}, workerGroup:{}, expected num: {}, actual: live replicas = {}, dead replicas = {}", (Object)serviceId, (Object)shardId, (Object)workerGroupId, (Object)expectedNum, (Object)liveReplicas, (Object)stat.deadReplica);
                    this.scheduler.scheduleAsyncAddToGroup(serviceId, shardId, workerGroupId);
                }
                if ((long)(stat.okReplicas + stat.scaleinReplicas) <= expectedNum) continue;
                LOG.debug("Remove redundant replicas for service:{}, shard:{}, workerGroup:{}, expected num: {}, actual num: {}", (Object)serviceId, (Object)shardId, (Object)workerGroupId, (Object)expectedNum, (Object)(stat.okReplicas + stat.scaleinReplicas));
                this.scheduler.scheduleAsyncRemoveFromGroup(serviceId, shardId, workerGroupId);
            }
            catch (StarException exception) {
                LOG.info("Fail to schedule tasks to scheduler. error:", (Throwable)exception);
            }
        }
    }

    void buildWorkerGroupStatsMap(String serviceId, List<Long> workerGroupIds) {
        if (!Config.ENABLE_BALANCE_SHARD_NUM_BETWEEN_WORKERS) {
            return;
        }
        ConcurrentHashMap<Long, WorkerGroupStats> newStats = new ConcurrentHashMap<Long, WorkerGroupStats>();
        for (Long groupId : workerGroupIds) {
            WorkerGroup workerGroup = this.workerManager.getWorkerGroup(serviceId, groupId);
            if (workerGroup == null) continue;
            WorkerGroupStats stats = new WorkerGroupStats(workerGroup);
            stats.refresh();
            newStats.put(groupId, stats);
        }
        this.workerGroupStatsMap = newStats;
    }

    WorkerGroupStats getWorkerGroupStats(long groupId) {
        if (!Config.ENABLE_BALANCE_SHARD_NUM_BETWEEN_WORKERS) {
            return null;
        }
        if (this.workerGroupStatsMap == null) {
            return null;
        }
        return this.workerGroupStatsMap.get(groupId);
    }

    boolean needBalanceBetweenWorkers(long groupId) {
        WorkerGroupStats stats = this.getWorkerGroupStats(groupId);
        if (stats == null) {
            return false;
        }
        return stats.needBalance();
    }

    boolean isPreferredBalanceDirection(long groupId, long srcWorkerId, long destWorkerId) {
        WorkerGroupStats stats = this.getWorkerGroupStats(groupId);
        if (stats == null) {
            return false;
        }
        return stats.isPreferredBalanceDirection(srcWorkerId, destWorkerId);
    }

    protected void shardGroupBalanceCheckForService(String serviceId) {
        LOG.debug("Start shard group balance health check for service: {}", (Object)serviceId);
        ShardManager shardManager = this.serviceManager.getShardManager(serviceId);
        if (shardManager == null) {
            LOG.info("ShardManager not exist for service {}, skip the shard group balance check.", (Object)serviceId);
            return;
        }
        List<Long> workerGroupIds = this.workerManager.getAllWorkerGroupIds(serviceId);
        if (workerGroupIds.isEmpty()) {
            return;
        }
        this.buildWorkerGroupStatsMap(serviceId, workerGroupIds);
        List<Long> shardGroupIds = shardManager.getAllShardGroupIds();
        for (long groupId : shardGroupIds) {
            ShardGroup group = shardManager.getShardGroup(groupId);
            if (group == null) continue;
            if (group.getShardIds().isEmpty()) {
                LOG.debug("empty shard group {} in service {}. skip it!", (Object)groupId, (Object)serviceId);
                continue;
            }
            try {
                PlacementPolicy policy = group.getPlacementPolicy();
                switch (policy) {
                    case PACK: {
                        this.balancePackShardGroup(shardManager, group);
                        break;
                    }
                    case SPREAD: {
                        this.balanceSpreadShardGroup(shardManager, group);
                        break;
                    }
                    case EXCLUDE: {
                        this.balanceExcludeShardGroup(shardManager, group);
                    }
                }
            }
            catch (StarException exception) {
                LOG.info("Got exception during processing service:{}, shardgroup:{} balance check, skip it.", (Object)serviceId, (Object)groupId, (Object)exception);
            }
            if (!Config.DISABLE_BACKGROUND_SHARD_SCHEDULE_CHECK) continue;
            throw new CheckInterruptedException("Interrupted shardgroup balance check.");
        }
    }

    private void balanceExcludeShardGroup(ShardManager shardManager, ShardGroup group) {
        ReplicaWorkerInvertIndex index = new ReplicaWorkerInvertIndex();
        index.buildFrom(this.workerManager, shardManager, group);
        block2: for (long workerGroupId : index.getAllWorkerGroupIds()) {
            List<Long> workerIds;
            WorkerGroup workerGroup = this.workerManager.getWorkerGroupNoException(group.getServiceId(), workerGroupId);
            if (workerGroup == null || (workerIds = workerGroup.getAllWorkerIds(true)).size() < 2) continue;
            ArrayList<Long> srcWorkerIds = new ArrayList<Long>();
            ArrayList<Long> tgtWorkerIds = new ArrayList<Long>();
            for (long workerId : workerIds) {
                if (index.getReplicaShardList(workerId).isEmpty()) {
                    tgtWorkerIds.add(workerId);
                    continue;
                }
                srcWorkerIds.add(workerId);
            }
            if (tgtWorkerIds.isEmpty()) {
                return;
            }
            srcWorkerIds.sort((o1, o2) -> Integer.compare(index.getReplicaShardList((long)o2).size(), index.getReplicaShardList((long)o1).size()));
            ScheduleScorer tgtScore = new ScheduleScorer(tgtWorkerIds);
            tgtScore.apply(this.workerManager);
            Iterator srcIt = srcWorkerIds.iterator();
            while (srcIt.hasNext() && !tgtScore.isEmpty()) {
                long srcWorkerId = (Long)srcIt.next();
                List<Long> tgtIdList = tgtScore.selectHighEnd(this.selector, 1);
                if (tgtIdList.isEmpty()) continue block2;
                long tgtWorkerId = tgtIdList.get(0);
                ArrayList<Long> candidates = new ArrayList<Long>(index.getReplicaShardList(srcWorkerId));
                if (candidates.size() <= 1) continue block2;
                long selected = (Long)candidates.get(0);
                String serviceId = group.getServiceId();
                try {
                    LOG.debug("[ExcludeGroup] Try to balance shard:{} (service:{}) replica from worker:{} => worker:{}", (Object)selected, (Object)serviceId, (Object)srcWorkerId, (Object)tgtWorkerId);
                    this.scheduler.scheduleAddToWorker(serviceId, selected, tgtWorkerId);
                    tgtScore.remove(tgtWorkerId);
                    this.scheduler.scheduleAsyncRemoveFromWorker(serviceId, Collections.nCopies(1, selected), srcWorkerId);
                }
                catch (Exception exception) {
                    LOG.info("[ExcludeGroup] Fail to balance shard:{} in service:{}, form worker:{} to worker:{}. Error:", (Object)selected, (Object)serviceId, (Object)srcWorkerId, (Object)tgtWorkerId, (Object)exception);
                }
            }
        }
    }

    private void balancePackShardGroup(ShardManager shardManager, ShardGroup group) {
        ArrayList<ShardGroup> packGroups = new ArrayList<ShardGroup>();
        ArrayList<Shard> packShards = new ArrayList<Shard>();
        this.collectAllShardsAndGroupsRecursively(shardManager, group, packGroups, packShards);
        Optional<Long> minGroupId = packGroups.stream().map(ShardGroup::getGroupId).min(Comparator.naturalOrder());
        Preconditions.checkState((boolean)minGroupId.isPresent());
        if (minGroupId.get() < group.getGroupId()) {
            return;
        }
        ReplicaWorkerInvertIndex index = new ReplicaWorkerInvertIndex();
        packShards.forEach(x -> index.addReplicas(this.workerManager, (Shard)x));
        for (long workerGroupId : index.getAllWorkerGroupIds()) {
            List<Long> workerIds;
            WorkerGroup workerGroup = this.workerManager.getWorkerGroupNoException(group.getServiceId(), workerGroupId);
            if (workerGroup == null || (workerIds = workerGroup.getAllWorkerIds(true)).size() < 2) continue;
            if (workerIds.size() < workerGroup.getReplicaNumber()) {
                LOG.debug("Worker group:{} only has {} alive workers. Shard in PACK shard group requires {} replicas. Skip it.", (Object)workerGroupId, (Object)workerIds.size(), (Object)workerGroup.getReplicaNumber());
                continue;
            }
            this.balancePackGroupInSingleWorkerGroup(workerIds, group, workerGroup.getReplicaNumber(), packGroups, index);
        }
    }

    private void collectAllShardsAndGroupsRecursively(ShardManager shardManager, ShardGroup startGroup, List<ShardGroup> groups, List<Shard> shards) throws StarException {
        ArrayList<ShardGroup> todoGroup = new ArrayList<ShardGroup>();
        todoGroup.add(startGroup);
        ArrayList<Shard> blockList = new ArrayList<Shard>();
        while (!todoGroup.isEmpty()) {
            ArrayList<ShardGroup> newGroups = new ArrayList<ShardGroup>();
            for (ShardGroup grp : todoGroup) {
                if (groups.contains(grp)) continue;
                groups.add(grp);
                for (long shardId : grp.getShardIds()) {
                    Shard shard = shardManager.getShard(shardId);
                    if (shard == null || shards.contains(shard) || blockList.contains(shard)) continue;
                    boolean hasPrecedenceGroup = false;
                    for (long gid : shard.getGroupIds()) {
                        ShardGroup grp2 = shardManager.getShardGroup(gid);
                        if (grp2 == null) continue;
                        if (grp2.getPlacementPolicy() == PlacementPolicy.PACK) {
                            newGroups.add(grp2);
                            continue;
                        }
                        if (grp2.getPlacementPolicy().getNumber() <= PlacementPolicy.PACK.getNumber() || grp2.getShardIds().size() <= 1) continue;
                        hasPrecedenceGroup = true;
                    }
                    if (hasPrecedenceGroup) {
                        blockList.add(shard);
                        continue;
                    }
                    shards.add(shard);
                }
            }
            todoGroup = newGroups;
        }
    }

    private void balancePackGroupInSingleWorkerGroup(List<Long> workerIds, ShardGroup currentGroup, int replicaNum, List<ShardGroup> groups, ReplicaWorkerInvertIndex index) {
        ScheduleScorer scorer = new ScheduleScorer(workerIds);
        ArrayList<Long> workersWithReplicas = new ArrayList<Long>();
        for (long workerId : workerIds) {
            int numReplicas = index.getReplicaShardList(workerId).size();
            if (numReplicas == 0) continue;
            workersWithReplicas.add(workerId);
            scorer.apply(PlacementPolicy.PACK, Collections.nCopies(numReplicas, workerId));
        }
        if (workersWithReplicas.size() == replicaNum) {
            return;
        }
        scorer.apply(this.workerManager);
        List<Long> selectedWorkers = scorer.selectHighEnd(this.selector, replicaNum);
        if (selectedWorkers.size() != replicaNum) {
            LOG.info("Failed to select {} workers from candidates while doing shard group:{} balance check, skip it!", (Object)replicaNum, (Object)currentGroup.getGroupId());
            return;
        }
        LOG.debug("[PackGroup] shardGroup: {}. Existing workers with replica: {}, selected tgargetWorkers: {}", (Object)currentGroup.getGroupId(), workersWithReplicas, selectedWorkers);
        ArrayList existShardIds = new ArrayList();
        workersWithReplicas.forEach(x -> existShardIds.addAll(index.getReplicaShardList((long)x)));
        ArrayList<Long> validTargetShardIdList = new ArrayList<Long>();
        for (ShardGroup packGroup : groups) {
            if (!packGroup.getShardIds().stream().anyMatch(existShardIds::contains)) continue;
            validTargetShardIdList.addAll(packGroup.getShardIds());
        }
        Iterator<Object> iterator = selectedWorkers.iterator();
        while (iterator.hasNext()) {
            long wid = (Long)iterator.next();
            Collection<Long> existingIdList = index.getReplicaShardList(wid);
            List<Long> todoIdList = validTargetShardIdList.stream().filter(x -> !existingIdList.contains(x)).collect(Collectors.toList());
            if (todoIdList.isEmpty()) continue;
            try {
                this.scheduler.scheduleAddToWorker(currentGroup.getServiceId(), todoIdList, wid);
            }
            catch (StarException exception) {
                LOG.info("Fail to schedule new replicas of shard:{} to worker:{}, error:", todoIdList, (Object)wid, (Object)exception);
                return;
            }
        }
        workersWithReplicas.removeIf(selectedWorkers::contains);
        iterator = workersWithReplicas.iterator();
        while (iterator.hasNext()) {
            long wid = (Long)iterator.next();
            Collection<Long> todoList = index.getReplicaShardList(wid);
            if (todoList.isEmpty()) continue;
            LOG.debug("Submit async task to remove shard replicas:{} from worker:{}", todoList, (Object)wid);
            this.scheduler.scheduleAsyncRemoveFromWorker(currentGroup.getServiceId(), new ArrayList<Long>(todoList), wid);
        }
    }

    private void balanceSpreadShardGroup(ShardManager shardManager, ShardGroup group) {
        ReplicaWorkerInvertIndex index = new ReplicaWorkerInvertIndex();
        index.buildFrom(this.workerManager, shardManager, group);
        for (long workerGroupId : index.getAllWorkerGroupIds()) {
            this.balanceSpreadGroupInSingleWorkerGroup(shardManager, group, index, workerGroupId);
        }
    }

    private void balanceSpreadGroupInSingleWorkerGroup(ShardManager shardManager, ShardGroup group, ReplicaWorkerInvertIndex index, long workerGroupId) {
        WorkerGroup workerGroup = this.workerManager.getWorkerGroupNoException(group.getServiceId(), workerGroupId);
        if (workerGroup == null) {
            return;
        }
        boolean warmupEnabled = workerGroup.warmupEnabled();
        List<Long> workerIds = workerGroup.getAllWorkerIds(true);
        if (workerIds.size() <= 1) {
            return;
        }
        boolean done = false;
        boolean hasConsideredWorkersImbalance = false;
        while (!done) {
            List relaxCandidates;
            if (workerIds.size() <= 1) {
                return;
            }
            ScheduleScorer scorer = new ScheduleScorer(workerIds);
            for (long workerId : workerIds) {
                int numReplicas = index.getReplicaShardList(workerId).size();
                if (numReplicas == 0) continue;
                scorer.apply(group.getPlacementPolicy(), Collections.nCopies(numReplicas, workerId));
            }
            scorer.apply(this.workerManager);
            List sortedEntries = scorer.getScores().entrySet().stream().sorted(Map.Entry.comparingByValue(Comparator.naturalOrder())).collect(Collectors.toList());
            long srcWorkerId = (Long)((Map.Entry)sortedEntries.get(0)).getKey();
            long targetWorkerId = (Long)((Map.Entry)sortedEntries.get(sortedEntries.size() - 1)).getKey();
            Collection<Long> srcList = index.getReplicaShardList(srcWorkerId);
            Collection<Long> tgtList = index.getReplicaShardList(targetWorkerId);
            if (srcList.size() <= tgtList.size() + Config.SCHEDULER_BALANCE_MAX_SKEW) {
                if (srcList.size() == tgtList.size()) {
                    done = true;
                    continue;
                }
                if (hasConsideredWorkersImbalance) {
                    done = true;
                    continue;
                }
                if (!this.isPreferredBalanceDirection(workerGroupId, srcWorkerId, targetWorkerId)) {
                    done = true;
                    continue;
                }
                hasConsideredWorkersImbalance = true;
                if (!this.needBalanceBetweenWorkers(workerGroupId)) {
                    done = true;
                    continue;
                }
            }
            if ((relaxCandidates = srcList.stream().filter(x -> !tgtList.contains(x)).map(shardManager::getShard).filter(Objects::nonNull).collect(Collectors.toList())).isEmpty()) {
                done = true;
                continue;
            }
            List candidates = relaxCandidates.stream().filter(x -> !this.hasPrecedenceShardGroup(shardManager, group.getPlacementPolicy(), x.getGroupIds())).collect(Collectors.toList());
            ArrayList<Object> finalSelects = new ArrayList<Object>();
            if (!candidates.isEmpty()) {
                int pickIndex = ThreadLocalRandom.current().nextInt(candidates.size());
                finalSelects.add(candidates.get(pickIndex));
            } else {
                for (Shard current : relaxCandidates) {
                    ShardGroup startGroup = null;
                    for (long currentShardGroupId : current.getGroupIds()) {
                        ShardGroup tmpGroup = shardManager.getShardGroup(currentShardGroupId);
                        if (tmpGroup == null || tmpGroup.getPlacementPolicy() != PlacementPolicy.PACK) continue;
                        startGroup = tmpGroup;
                        break;
                    }
                    if (startGroup == null) continue;
                    ArrayList<Shard> packedShards = new ArrayList<Shard>();
                    ArrayList<ShardGroup> packGroups = new ArrayList<ShardGroup>();
                    this.collectAllShardsAndGroupsRecursively(shardManager, startGroup, packGroups, packedShards);
                    ArrayList<Shard> effectiveShards = new ArrayList<Shard>();
                    ArrayList<ShardGroup> effectiveShardGroups = new ArrayList<ShardGroup>();
                    for (Shard shard : packedShards) {
                        if (!shard.getReplicaWorkerIds().stream().anyMatch(workerIds::contains)) continue;
                        effectiveShards.add(shard);
                        Iterator<Object> iterator = shard.getGroupIds().iterator();
                        while (iterator.hasNext()) {
                            long groupId = (Long)iterator.next();
                            ShardGroup groupObj = shardManager.getShardGroup(groupId);
                            if (groupObj == null || groupObj.getPlacementPolicy() == PlacementPolicy.PACK || effectiveShardGroups.contains(groupObj)) continue;
                            effectiveShardGroups.add(groupObj);
                        }
                    }
                    int agree = 0;
                    int reject = 0;
                    for (ShardGroup affectGroup : effectiveShardGroups) {
                        ReplicaWorkerInvertIndex tempIndex = new ReplicaWorkerInvertIndex();
                        tempIndex.buildFrom(this.workerManager, shardManager, affectGroup);
                        double scoreBeforeMove = this.calculateScore(tempIndex, workerIds, affectGroup.getPlacementPolicy());
                        for (Shard toMove : effectiveShards) {
                            if (!toMove.getGroupIds().contains(affectGroup.getGroupId())) continue;
                            tempIndex.removeReplica(toMove.getShardId(), srcWorkerId);
                            tempIndex.addReplica(toMove.getShardId(), targetWorkerId, workerGroupId);
                        }
                        double epsilon = 0.001;
                        double scoreAfterMove = this.calculateScore(tempIndex, workerIds, affectGroup.getPlacementPolicy());
                        if (!(Math.abs(scoreAfterMove - scoreBeforeMove) > 0.001)) continue;
                        if (scoreAfterMove > scoreBeforeMove) {
                            ++reject;
                            break;
                        }
                        if (!(scoreAfterMove < scoreBeforeMove)) continue;
                        ++agree;
                    }
                    if (agree <= 0 || reject != 0) continue;
                    finalSelects.addAll(effectiveShards);
                    break;
                }
                if (finalSelects.isEmpty()) {
                    done = true;
                    continue;
                }
            }
            List<Long> allTodoIds = finalSelects.stream().map(Shard::getShardId).collect(Collectors.toList());
            Preconditions.checkState((!allTodoIds.isEmpty() ? 1 : 0) != 0);
            boolean addSuccess = false;
            try {
                LOG.debug("Try to balance shard:{} (service:{}) replica from worker:{} => worker:{}", allTodoIds, (Object)group.getServiceId(), (Object)srcWorkerId, (Object)targetWorkerId);
                this.scheduler.scheduleAddToWorker(group.getServiceId(), allTodoIds, targetWorkerId);
                addSuccess = true;
                finalSelects.forEach(x -> {
                    if (x.getGroupIds().contains(group.getGroupId())) {
                        index.addReplica(x.getShardId(), targetWorkerId, workerGroupId);
                    }
                });
            }
            catch (Exception exception) {
                LOG.info("Fail to balance shard:{} in service:{}, form worker:{} to worker:{}. Error:", allTodoIds, (Object)group.getServiceId(), (Object)srcWorkerId, (Object)targetWorkerId, (Object)exception);
                workerIds.remove(targetWorkerId);
            }
            if (!addSuccess) continue;
            try {
                if (warmupEnabled) {
                    shardManager.scaleInShardReplicas(allTodoIds, srcWorkerId);
                } else if (allTodoIds.size() > 1) {
                    this.scheduler.scheduleRemoveFromWorker(group.getServiceId(), allTodoIds, srcWorkerId);
                } else {
                    this.scheduler.scheduleAsyncRemoveFromWorker(group.getServiceId(), allTodoIds, srcWorkerId);
                }
                finalSelects.forEach(x -> {
                    if (x.getGroupIds().contains(group.getGroupId())) {
                        index.removeReplica(x.getShardId(), srcWorkerId);
                    }
                });
            }
            catch (Exception exception) {
                LOG.debug("Fail to remove shard:{} replica from worker:{}", allTodoIds, (Object)srcWorkerId);
            }
        }
    }

    private double calculateScore(ReplicaWorkerInvertIndex index, List<Long> workerIds, PlacementPolicy policy) {
        switch (policy) {
            case SPREAD: {
                return this.calculateDeviation(index, workerIds);
            }
            case EXCLUDE: {
                return (double)workerIds.stream().filter(x -> index.getReplicaShardList((long)x).isEmpty()).count() / (double)workerIds.size();
            }
        }
        return 0.0;
    }

    private double calculateDeviation(ReplicaWorkerInvertIndex index, List<Long> workerIds) {
        double mean = 0.0;
        double sMean = 0.0;
        for (long id : workerIds) {
            int n = index.getReplicaShardList(id).size();
            mean += (double)n;
            sMean += (double)(n * n);
        }
        return sMean / (double)workerIds.size() - (mean /= (double)workerIds.size()) * mean;
    }

    private boolean hasPrecedenceShardGroup(ShardManager shardManager, PlacementPolicy policy, List<Long> groupIds) {
        for (long id : groupIds) {
            ShardGroup group = shardManager.getShardGroup(id);
            if (group == null || group.getPlacementPolicy().getNumber() <= policy.getNumber() || group.getShardIds().size() <= 1) continue;
            return true;
        }
        return false;
    }

    static class ReplicaStat {
        int shutdownReplicas = 0;
        int deadReplica = 0;
        int okReplicas = 0;
        int scaleoutReplicas = 0;
        int scaleinReplicas = 0;

        ReplicaStat() {
        }
    }

    static class WorkerGroupStats {
        WorkerGroup workerGroup;
        long lastUpdateTimeStamp = 0L;
        boolean balanced = true;
        SplittableRandom randomGen = new SplittableRandom();
        double shardGroupBalanceProbability = 0.0;
        double averageNum = 0.0;

        public WorkerGroupStats(WorkerGroup workerGroup) {
            this.workerGroup = workerGroup;
        }

        public boolean needBalance() {
            if (this.lastUpdateTimeStamp + 5000L < System.currentTimeMillis()) {
                this.refresh();
            }
            return !this.balanced && this.randomGen.nextDouble() < this.shardGroupBalanceProbability;
        }

        public boolean isPreferredBalanceDirection(long srcWorkerId, long targetWorkerId) {
            Worker srcWorker = this.workerGroup.getWorker(srcWorkerId);
            Worker tgtWorker = this.workerGroup.getWorker(targetWorkerId);
            if (srcWorker == null || tgtWorker == null) {
                return false;
            }
            return (double)srcWorker.getNumOfShards() > this.averageNum && (double)tgtWorker.getNumOfShards() < this.averageNum && srcWorker.getNumOfShards() > tgtWorker.getNumOfShards() + 2L;
        }

        public void refresh() {
            long total = 0L;
            long count = 0L;
            long min = Long.MAX_VALUE;
            long max = 0L;
            for (Long id : this.workerGroup.getAllWorkerIds(true)) {
                Worker worker = this.workerGroup.getWorker(id);
                if (worker == null) continue;
                long num = worker.getNumOfShards();
                if (num > max) {
                    max = num;
                }
                if (num < min) {
                    min = num;
                }
                total += worker.getNumOfShards();
                ++count;
            }
            this.balanced = true;
            this.lastUpdateTimeStamp = System.currentTimeMillis();
            if (count == 0L) {
                this.shardGroupBalanceProbability = 0.0;
                return;
            }
            this.averageNum = (double)total / (double)count;
            if ((double)(max - min) / this.averageNum > Config.BALANCE_WORKER_SHARDS_THRESHOLD_IN_PERCENT) {
                this.shardGroupBalanceProbability = Double.min((double)(max - min) / this.averageNum / 5.0, Config.BALANCE_WORKER_SHARDS_THRESHOLD_IN_PERCENT / 2.0);
                this.randomGen = new SplittableRandom(System.currentTimeMillis());
                this.balanced = false;
                LOG.debug("Balance Probability for workerGroup:{}: {}", (Object)this.workerGroup.getGroupId(), (Object)this.shardGroupBalanceProbability);
            }
        }
    }
}

