/*
 * Decompiled with CFR 0.152.
 */
package com.staros.schedule;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.staros.exception.ExceptionCode;
import com.staros.exception.NoAliveWorkersException;
import com.staros.exception.NotExistStarException;
import com.staros.exception.ScheduleConflictStarException;
import com.staros.exception.StarException;
import com.staros.exception.WorkerNotHealthyStarException;
import com.staros.metrics.MetricsSystem;
import com.staros.proto.AddShardInfo;
import com.staros.proto.AddShardRequest;
import com.staros.proto.PlacementPolicy;
import com.staros.proto.RemoveShardRequest;
import com.staros.proto.ReplicaState;
import com.staros.replica.Replica;
import com.staros.schedule.ScheduleRequestContext;
import com.staros.schedule.ScheduleScorer;
import com.staros.schedule.Scheduler;
import com.staros.schedule.select.FirstNSelector;
import com.staros.schedule.select.Selector;
import com.staros.service.ServiceManager;
import com.staros.shard.Shard;
import com.staros.shard.ShardGroup;
import com.staros.shard.ShardManager;
import com.staros.shard.ShardPolicyFilter;
import com.staros.util.AbstractServer;
import com.staros.util.Config;
import com.staros.util.LockCloseable;
import com.staros.util.Utils;
import com.staros.worker.Worker;
import com.staros.worker.WorkerGroup;
import com.staros.worker.WorkerManager;
import io.prometheus.metrics.core.datapoints.CounterDataPoint;
import io.prometheus.metrics.core.metrics.Counter;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ShardSchedulerV2
extends AbstractServer
implements Scheduler {
    private static final Logger LOG = LogManager.getLogger(ShardSchedulerV2.class);
    private static final int PRIORITY_LOW = 0;
    private static final int PRIORITY_MEDIUM = 10;
    private static final int PRIORITY_HIGH = 20;
    private static final int ADJUST_THREAD_INTERVAL_SECS = 300;
    private static final int INIT_CHECK_THREAD_DELAY_SECS = 30;
    private static final int shortNap = 100;
    private final ExclusiveLocker requestLocker = new ExclusiveLocker();
    private final Selector scoreSelector = new FirstNSelector();
    private static final List<PlacementPolicy> conflictPolicies = Arrays.asList(PlacementPolicy.EXCLUDE, PlacementPolicy.PACK, PlacementPolicy.SPREAD);
    private ScheduledThreadPoolExecutor calculateExecutors;
    private ThreadPoolExecutor dispatchExecutors;
    private ScheduledThreadPoolExecutor adjustPoolExecutors;
    private final ServiceManager serviceManager;
    private final WorkerManager workerManager;
    private static final Counter METRIC_WORKER_SHARD_COUNT = MetricsSystem.registerCounter((String)"starmgr_schedule_shard_ops", (String)"count of operations by adding/remove shards to/from worker", (List)Lists.newArrayList((Object[])new String[]{"op"}));
    private final CounterDataPoint addShardOpCounter = (CounterDataPoint)METRIC_WORKER_SHARD_COUNT.labelValues(new String[]{"add"});
    private final CounterDataPoint removeShardOpCounter = (CounterDataPoint)METRIC_WORKER_SHARD_COUNT.labelValues(new String[]{"remove"});

    private DispatchTask<StarException> dispatchTaskForAddToWorker(String serviceId, List<Long> shardIds, long workerId, int priority) {
        Callable<StarException> callable = () -> {
            try {
                this.executeAddToWorker(serviceId, shardIds, false, workerId);
                return null;
            }
            catch (StarException e) {
                return e;
            }
            catch (Exception e) {
                return new StarException(ExceptionCode.SCHEDULE, e.getMessage());
            }
        };
        String description = String.format("[AddToWorker Task] serviceId: %s, workerId: %s, priority: %d", serviceId, workerId, priority);
        return new DispatchTask<StarException>(callable, priority, description);
    }

    private DispatchTask<Boolean> dispatchTaskForAddToGroup(ScheduleRequestContext ctx, int priority) {
        String description = String.format("[AddToGroup Task] %s, priority: %d", ctx, priority);
        return new DispatchTask<Boolean>(() -> this.executeAddToGroupPhase2(ctx), true, priority, description);
    }

    private DispatchTask<Boolean> dispatchTaskForRemoveFromGroup(ScheduleRequestContext ctx, int priority) {
        String description = String.format("[RemoveFromGroup Task] %s, priority: %d", ctx, priority);
        return new DispatchTask<Boolean>(() -> this.executeRemoveFromGroupPhase2(ctx), true, priority, description);
    }

    private DispatchTask<StarException> dispatchTaskForRemoveFromWorker(String serviceId, List<Long> shardIds, long workerId, int priority) {
        Callable<StarException> callable = () -> {
            try {
                this.executeRemoveFromWorker(serviceId, shardIds, workerId);
                return null;
            }
            catch (StarException e) {
                return e;
            }
            catch (Exception e) {
                return new StarException(ExceptionCode.SCHEDULE, e.getMessage());
            }
        };
        String description = String.format("[RemoveFromWorker Task] serviceId: %s, workerId: %s, priority: %d", serviceId, workerId, priority);
        return new DispatchTask<StarException>(callable, priority, description);
    }

    public ShardSchedulerV2(ServiceManager serviceManager, WorkerManager workerManager) {
        this.serviceManager = serviceManager;
        this.workerManager = workerManager;
    }

    @Override
    public void scheduleAddToGroup(String serviceId, long shardId, long wgId) throws StarException {
        this.scheduleAddToGroup(serviceId, Collections.nCopies(1, shardId), wgId);
    }

    @Override
    public void scheduleAddToGroup(String serviceId, List<Long> shardIds, long wgId) throws StarException {
        CountDownLatch latch = new CountDownLatch(shardIds.size());
        ArrayList<ScheduleRequestContext> ctxs = new ArrayList<ScheduleRequestContext>();
        for (Long id : shardIds) {
            ScheduleRequestContext ctx = new ScheduleRequestContext(serviceId, id, wgId, latch);
            ctxs.add(ctx);
            this.submitCalcTaskInternal(() -> this.executeAddToGroupPhase1(ctx), 0L);
        }
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            throw new StarException(ExceptionCode.SCHEDULE, e.getMessage());
        }
        for (ScheduleRequestContext ctx : ctxs) {
            if (ctx.getException() == null) continue;
            throw ctx.getException();
        }
    }

    @Override
    public void scheduleAsyncAddToGroup(String serviceId, long shardId, long wgId) throws StarException {
        ScheduleRequestContext ctx = new ScheduleRequestContext(serviceId, shardId, wgId, null);
        this.submitCalcTaskInternal(() -> this.executeAddToGroupPhase1(ctx), 0L);
    }

    @Override
    public void scheduleAddToDefaultGroup(String serviceId, List<Long> shardIds) throws StarException {
        WorkerGroup group = this.workerManager.getDefaultWorkerGroup(serviceId);
        if (group == null) {
            throw new StarException(ExceptionCode.NOT_EXIST, String.format("DefaultWorkerGroup not exist for service %s", serviceId));
        }
        this.scheduleAddToGroup(serviceId, shardIds, group.getGroupId());
    }

    @Override
    public void scheduleAsyncRemoveFromGroup(String serviceId, long shardId, long workerGroupId) throws StarException {
        ScheduleRequestContext ctx = new ScheduleRequestContext(serviceId, shardId, workerGroupId, null);
        this.submitCalcTaskInternal(() -> this.executeRemoveFromGroupPhase1(ctx), 0L);
    }

    private void executeAddToGroupPhase1(ScheduleRequestContext ctx) {
        try {
            this.executeAddToGroupPhase1Detail(ctx);
        }
        catch (ScheduleConflictStarException e) {
            this.submitCalcTaskInternal(() -> this.executeAddToGroupPhase1(ctx), 100L);
        }
        catch (StarException exception) {
            ctx.done(exception);
        }
        catch (Throwable throwable) {
            ctx.done(new StarException(ExceptionCode.SCHEDULE, throwable.getMessage()));
        }
    }

    private void submitCalcTaskInternal(Runnable run, long delay) throws StarException {
        try {
            if (delay == 0L) {
                this.calculateExecutors.execute(run);
            } else {
                this.calculateExecutors.schedule(run, delay, TimeUnit.MICROSECONDS);
            }
        }
        catch (RejectedExecutionException e) {
            if (!this.isRunning()) {
                throw new StarException(ExceptionCode.SCHEDULE, "Scheduling shutdown!");
            }
            throw new StarException(ExceptionCode.SCHEDULE, e.getMessage());
        }
    }

    @Override
    public void scheduleAddToWorker(String serviceId, long shardId, long workerId) throws StarException {
        this.scheduleAddToWorker(serviceId, Collections.nCopies(1, shardId), workerId);
    }

    @Override
    public void scheduleAddToWorker(String serviceId, List<Long> shardIds, long workerId) throws StarException {
        DispatchTask<StarException> task = this.dispatchTaskForAddToWorker(serviceId, shardIds, workerId, 20);
        this.submitDispatchTask(task, true);
    }

    @Override
    public void scheduleAsyncAddToWorker(String serviceId, List<Long> shardIds, long workerId) throws StarException {
        DispatchTask<StarException> task = this.dispatchTaskForAddToWorker(serviceId, shardIds, workerId, 10);
        this.submitDispatchTask(task, false);
    }

    @Override
    public void scheduleRemoveFromWorker(String serviceId, long shardId, long workerId) throws StarException {
        this.scheduleRemoveFromWorker(serviceId, Collections.nCopies(1, shardId), workerId);
    }

    @Override
    public void scheduleRemoveFromWorker(String serviceId, List<Long> shardIds, long workerId) throws StarException {
        DispatchTask<StarException> task = this.dispatchTaskForRemoveFromWorker(serviceId, shardIds, workerId, 10);
        this.submitDispatchTask(task, true);
    }

    @Override
    public void scheduleAsyncRemoveFromWorker(String serviceId, List<Long> shardIds, long workerId) throws StarException {
        DispatchTask<StarException> task = this.dispatchTaskForRemoveFromWorker(serviceId, shardIds, workerId, 0);
        this.submitDispatchTask(task, false);
    }

    private <T extends Exception> void submitDispatchTask(DispatchTask<T> task, boolean wait) throws StarException {
        try {
            this.dispatchExecutors.execute(task);
        }
        catch (Exception e) {
            LOG.error("Fail to submit schedule task {}", task, (Object)e);
            throw new StarException(ExceptionCode.SCHEDULE, e.getMessage());
        }
        if (wait) {
            Exception exception = null;
            try {
                if (task.get() != null) {
                    exception = (Exception)task.get();
                }
            }
            catch (Throwable e) {
                LOG.error("Fail to get task result. task: {}", task, (Object)e);
                throw new StarException(ExceptionCode.SCHEDULE, e.getMessage());
            }
            if (exception != null) {
                if (exception instanceof StarException) {
                    throw (StarException)((Object)exception);
                }
                throw new StarException(ExceptionCode.SCHEDULE, exception.getMessage());
            }
        }
    }

    private void executeAddToGroupPhase1Detail(ScheduleRequestContext ctx) {
        ShardManager shardManager = this.serviceManager.getShardManager(ctx.getServiceId());
        if (shardManager == null) {
            ctx.done((StarException)((Object)new NotExistStarException("Service {} Not Exist", new Object[]{ctx.getServiceId()})));
            return;
        }
        Shard shard = shardManager.getShard(ctx.getShardId());
        if (shard == null) {
            ctx.done((StarException)((Object)new NotExistStarException("Shard {} Not Exist", new Object[]{ctx.getShardId()})));
            return;
        }
        WorkerGroup wg = this.workerManager.getWorkerGroupNoException(shard.getServiceId(), ctx.getWorkerGroupId());
        if (wg == null) {
            ctx.done((StarException)((Object)new NotExistStarException("WorkerGroup {} doesn't exist!", new Object[]{ctx.getWorkerGroupId()})));
            return;
        }
        int replicaNum = wg.getReplicaNumber();
        ArrayList<Long> existReplicas = new ArrayList<Long>();
        int validNumReplicas = 0;
        int deadReplicas = 0;
        ArrayList<Long> tempReplica = new ArrayList<Long>();
        for (Replica replica : shard.getReplica()) {
            Worker w = this.workerManager.getWorker(replica.getWorkerId());
            if (w == null || w.getGroupId() != ctx.getWorkerGroupId()) continue;
            if (!w.isAlive()) {
                if (!w.replicaExpired()) {
                    ++deadReplicas;
                }
            } else if (replica.getState() != ReplicaState.REPLICA_SCALE_IN) {
                ++validNumReplicas;
                if (replica.getTempFlag()) {
                    tempReplica.add(replica.getWorkerId());
                }
            }
            existReplicas.add(replica.getWorkerId());
        }
        if (validNumReplicas + deadReplicas <= replicaNum) {
            Iterator iterator = tempReplica.iterator();
            while (iterator.hasNext()) {
                long workerId = (Long)iterator.next();
                LOG.debug("Convert shard:{} temp replica:{} to normal replica.", (Object)shard.getShardId(), (Object)workerId);
                shardManager.convertTempShardReplicaToNormal(Collections.nCopies(1, shard.getShardId()), workerId);
            }
        }
        if (validNumReplicas > 0 && validNumReplicas + deadReplicas >= replicaNum) {
            LOG.debug("shard:{} replica stats in workerGroup:{}, validNumReplicas:{}, deadReplica:{}, replicaNum:{}", (Object)shard.getShardId(), (Object)ctx.getWorkerGroupId(), (Object)validNumReplicas, (Object)deadReplicas, (Object)replicaNum);
            ctx.done();
            return;
        }
        int desired = replicaNum - validNumReplicas - deadReplicas;
        ctx.setGenerateTempReplica(desired <= 0);
        if (desired < 1) {
            desired = 1;
        }
        int priority = 10;
        if (ctx.isWaited()) {
            priority = 20;
        } else if (desired <= replicaNum / 2) {
            priority = 0;
        }
        HashSet<Long> wIds = new HashSet<Long>(wg.getAllWorkerIds(true));
        if (wIds.isEmpty()) {
            ctx.done((StarException)((Object)new NoAliveWorkersException("WorkerGroup {} doesn't have alive workers", new Object[]{ctx.getWorkerGroupId()})));
            return;
        }
        existReplicas.forEach(wIds::remove);
        if (!this.requestLocker.tryLock(ctx, shardManager)) {
            ctx.reset();
            throw new ScheduleConflictStarException();
        }
        try (DeferOp cleanOp = new DeferOp(ctx.getRunnable());){
            List<Long> workerIds;
            HashMap<PlacementPolicy, Collection<Long>> ppMap = new HashMap<PlacementPolicy, Collection<Long>>();
            for (Long gid : shard.getGroupIds()) {
                ShardGroup g = shardManager.getShardGroup(gid);
                PlacementPolicy policy = g.getPlacementPolicy();
                ArrayList workerIds2 = new ArrayList();
                for (Long id : g.getShardIds()) {
                    Shard firstDegreeShard;
                    if (id.longValue() == shard.getShardId() || (firstDegreeShard = shardManager.getShard(id)) == null) continue;
                    workerIds2.addAll(firstDegreeShard.getReplicaWorkerIds().stream().filter(wIds::contains).collect(Collectors.toList()));
                }
                if (workerIds2.isEmpty()) continue;
                if (ppMap.containsKey(policy)) {
                    ((Collection)ppMap.get(policy)).addAll(workerIds2);
                    continue;
                }
                ppMap.put(policy, workerIds2);
            }
            ShardPolicyFilter.filter(ppMap, wIds);
            if (wIds.isEmpty()) {
                ctx.done(new StarException(ExceptionCode.SCHEDULE, String.format("Can't find worker for request: %s", ctx)));
                return;
            }
            if (wIds.size() < desired) {
                LOG.debug("Schedule requests {} workers, but only {} available. {}", (Object)desired, (Object)wIds.size(), (Object)ctx);
                workerIds = new ArrayList<Long>(wIds);
            } else {
                ScheduleScorer scorer = new ScheduleScorer(wIds);
                ppMap.forEach(scorer::apply);
                scorer.apply(this.workerManager);
                LOG.debug("final scores for selection: {}, for request {}", scorer.getScores(), (Object)ctx);
                workerIds = scorer.selectHighEnd(this.scoreSelector, desired);
            }
            ctx.setWorkerIds(workerIds);
            LOG.debug("Schedule request {}, pending schedule to workerList: {}", (Object)ctx, ctx.getWorkerIds());
            DispatchTask<Boolean> task = this.dispatchTaskForAddToGroup(ctx, priority);
            try {
                this.dispatchExecutors.execute(task);
                cleanOp.cancel();
            }
            catch (Throwable e) {
                LOG.error("Fail to add task {} into dispatchWorkerExecutors", task, (Object)e);
                ctx.done(new StarException(ExceptionCode.SCHEDULE, e.getMessage()));
            }
        }
    }

    private void executeAddToGroupPhase2(ScheduleRequestContext ctx) {
        try (DeferOp ignored = new DeferOp(ctx.getRunnable());){
            if (this.isRunning()) {
                this.executeAddToWorker(ctx);
            } else {
                ctx.done(new StarException(ExceptionCode.SCHEDULE, "Schedule shutdown in progress"));
            }
        }
    }

    private void executeAddToWorker(ScheduleRequestContext ctx) {
        StarException exception = null;
        for (long workerId : ctx.getWorkerIds()) {
            try {
                this.executeAddToWorker(ctx.getServiceId(), Collections.nCopies(1, ctx.getShardId()), ctx.isGenerateTempReplica(), workerId);
            }
            catch (StarException e) {
                exception = e;
            }
            catch (Exception e) {
                exception = new StarException(ExceptionCode.SCHEDULE, e.getMessage());
            }
        }
        ctx.done(exception);
    }

    private void executeAddToWorker(String serviceId, List<Long> shardIds, boolean isTempReplica, long workerId) {
        ShardManager shardManager = this.serviceManager.getShardManager(serviceId);
        if (shardManager == null) {
            throw new StarException(ExceptionCode.NOT_EXIST, String.format("service %s not exists", serviceId));
        }
        Worker worker = this.workerManager.getWorker(workerId);
        if (worker == null) {
            throw new StarException(ExceptionCode.NOT_EXIST, String.format("worker %d not exists", workerId));
        }
        if (!worker.getServiceId().equals(serviceId)) {
            throw new StarException(ExceptionCode.INVALID_ARGUMENT, String.format("worker %d doesn't belong to service: %s", workerId, serviceId));
        }
        WorkerGroup wg = this.workerManager.getWorkerGroup(serviceId, worker.getGroupId());
        if (wg == null) {
            throw new NotExistStarException("workerGroup {} not exists", new Object[]{worker.getGroupId()});
        }
        List<Long> workerIds = wg.replicationEnabled() ? wg.getAllWorkerIds(false) : Collections.emptyList();
        boolean includeReplicaHash = wg.replicationEnabled();
        Replica newReplica = new Replica(workerId);
        newReplica.setReplicaState(wg.warmupEnabled() ? ReplicaState.REPLICA_SCALE_OUT : ReplicaState.REPLICA_OK);
        ArrayList batches = new ArrayList();
        int remainShardSize = shardIds.size();
        ArrayList<AddShardInfo> miniBatch = new ArrayList<AddShardInfo>(Integer.min(remainShardSize, Config.SCHEDULER_MAX_BATCH_ADD_SHARD_SIZE));
        for (Long l : shardIds) {
            Shard shard = shardManager.getShard(l);
            --remainShardSize;
            if (shard == null) {
                if (shardIds.size() != 1) continue;
                throw new StarException(ExceptionCode.NOT_EXIST, String.format("shard %d not exists", l));
            }
            miniBatch.add(shard.getAddShardInfo(workerIds, newReplica, includeReplicaHash));
            if (miniBatch.size() < Config.SCHEDULER_MAX_BATCH_ADD_SHARD_SIZE) continue;
            batches.add(miniBatch);
            miniBatch = new ArrayList(Integer.min(remainShardSize, Config.SCHEDULER_MAX_BATCH_ADD_SHARD_SIZE));
        }
        if (!miniBatch.isEmpty()) {
            batches.add(miniBatch);
        }
        for (List list : batches) {
            AddShardRequest request = AddShardRequest.newBuilder().setServiceId(serviceId).setWorkerId(workerId).addAllShardInfo((Iterable)list).build();
            this.addShardOpCounter.inc();
            try {
                worker.addShard(request);
                List<Long> currentBatchShardIdList = request.getShardInfoList().stream().map(AddShardInfo::getShardId).collect(Collectors.toList());
                if (wg.warmupEnabled()) {
                    shardManager.scaleOutShardReplicas(currentBatchShardIdList, workerId, isTempReplica);
                    continue;
                }
                shardManager.addShardReplicas(currentBatchShardIdList, workerId, isTempReplica);
            }
            catch (WorkerNotHealthyStarException e) {
                throw e;
            }
            catch (StarException e) {
                throw new StarException(ExceptionCode.SCHEDULE, String.format("Schedule add shard task execution failed serviceId: %s, workerId: %d, shardIds: %s", serviceId, workerId, shardIds));
            }
        }
    }

    private void executeRemoveFromWorker(String serviceId, List<Long> shardIds, long workerId) {
        ShardManager shardManager = this.serviceManager.getShardManager(serviceId);
        if (shardManager == null) {
            throw new StarException(ExceptionCode.NOT_EXIST, String.format("service %s not exist!", serviceId));
        }
        Worker worker = this.workerManager.getWorker(workerId);
        if (worker == null) {
            LOG.debug("worker {} not exist when execute remove shards for service {}!", (Object)workerId, (Object)serviceId);
        } else if (!worker.isAlive()) {
            LOG.debug("worker {} dead when execute remove shards for service {}.", (Object)workerId, (Object)serviceId);
        } else if (!worker.getServiceId().equals(serviceId)) {
            LOG.debug("worker {} doesn't belong to service {}", (Object)workerId, (Object)serviceId);
        } else {
            RemoveShardRequest request = RemoveShardRequest.newBuilder().setServiceId(serviceId).setWorkerId(workerId).addAllShardIds(shardIds).build();
            this.removeShardOpCounter.inc();
            try {
                worker.removeShard(request);
            }
            catch (WorkerNotHealthyStarException e) {
                throw e;
            }
            catch (StarException e) {
                throw new StarException(ExceptionCode.SCHEDULE, String.format("Schedule remove shard task execution failed serviceId: %s, workerId: %d, shardIds: %s", serviceId, workerId, shardIds));
            }
        }
        shardManager.removeShardReplicas(shardIds, workerId);
    }

    private void executeRemoveFromGroupPhase1(ScheduleRequestContext ctx) {
        if (!this.isRunning()) {
            ctx.done(new StarException(ExceptionCode.SCHEDULE, "Schedule in shutdown progress"));
            return;
        }
        try {
            this.executeRemoveFromGroupPhase1Detail(ctx);
        }
        catch (ScheduleConflictStarException exception) {
            this.submitCalcTaskInternal(() -> this.executeRemoveFromGroupPhase1(ctx), 100L);
        }
        catch (StarException exception) {
            ctx.done(exception);
        }
        catch (Throwable throwable) {
            ctx.done(new StarException(ExceptionCode.SCHEDULE, throwable.getMessage()));
        }
    }

    private void executeRemoveFromGroupPhase1Detail(ScheduleRequestContext ctx) {
        Object worker;
        ShardManager shardManager = this.serviceManager.getShardManager(ctx.getServiceId());
        if (shardManager == null) {
            ctx.done(new StarException(ExceptionCode.NOT_EXIST, String.format("Service %s Not Exist", ctx.getServiceId())));
            return;
        }
        Shard shard = shardManager.getShard(ctx.getShardId());
        if (shard == null) {
            ctx.done(new StarException(ExceptionCode.NOT_EXIST, String.format("Shard %d Not Exist", ctx.getShardId())));
            return;
        }
        WorkerGroup wg = this.workerManager.getWorkerGroup(ctx.getServiceId(), ctx.getWorkerGroupId());
        if (wg == null) {
            ctx.done((StarException)((Object)new NotExistStarException("WorkerGroup {} Not Exist", new Object[]{ctx.getWorkerGroupId()})));
            return;
        }
        int replicaNum = wg.getReplicaNumber();
        ArrayList<Long> okWorkerIds = new ArrayList<Long>();
        ArrayList<Long> scaleInWorkerIds = new ArrayList<Long>();
        ArrayList<Long> expiredReplicas = new ArrayList<Long>();
        ArrayList<Long> tempReplicas = new ArrayList<Long>();
        for (Replica replica : shard.getReplica()) {
            long workerId = replica.getWorkerId();
            worker = this.workerManager.getWorker(workerId);
            if (worker == null) {
                expiredReplicas.add(workerId);
                continue;
            }
            if (((Worker)worker).getGroupId() != ctx.getWorkerGroupId()) continue;
            if (((Worker)worker).replicaExpired()) {
                expiredReplicas.add(workerId);
                continue;
            }
            if (!((Worker)worker).isAlive()) continue;
            if (replica.getState() == ReplicaState.REPLICA_SCALE_IN) {
                scaleInWorkerIds.add(workerId);
                continue;
            }
            if (replica.getTempFlag()) {
                tempReplicas.add(workerId);
                continue;
            }
            if (replica.getState() != ReplicaState.REPLICA_OK) continue;
            okWorkerIds.add(workerId);
        }
        Iterator iterator = expiredReplicas.iterator();
        while (iterator.hasNext()) {
            long workerId = (Long)iterator.next();
            shardManager.removeShardReplicas(Collections.nCopies(1, ctx.getShardId()), workerId);
        }
        if (okWorkerIds.size() + scaleInWorkerIds.size() + tempReplicas.size() <= replicaNum) {
            LOG.debug("{}, Number of replicas (include dead ones) are less than expected replica. Skip it.", (Object)ctx);
            ctx.done();
            return;
        }
        if (!this.requestLocker.tryLock(ctx, shardManager)) {
            throw new ScheduleConflictStarException();
        }
        try (DeferOp cleanOp = new DeferOp(ctx.getRunnable());){
            ScheduleScorer scorer = !scaleInWorkerIds.isEmpty() ? new ScheduleScorer(scaleInWorkerIds) : (!tempReplicas.isEmpty() ? new ScheduleScorer(tempReplicas) : new ScheduleScorer(okWorkerIds));
            worker = shard.getGroupIds().iterator();
            while (worker.hasNext()) {
                long groupId = (Long)worker.next();
                ShardGroup group = shardManager.getShardGroup(groupId);
                if (group == null) continue;
                ArrayList<Long> allReplicaWorkerIds = new ArrayList<Long>();
                for (long sid : group.getShardIds()) {
                    Shard firstDegreeShard;
                    if (sid == shard.getShardId() || (firstDegreeShard = shardManager.getShard(sid)) == null) continue;
                    allReplicaWorkerIds.addAll(firstDegreeShard.getReplicaWorkerIds());
                }
                scorer.apply(group.getPlacementPolicy(), allReplicaWorkerIds);
            }
            scorer.apply(this.workerManager);
            LOG.debug("final scores for selection: {}, for request {}", scorer.getScores(), (Object)ctx);
            List<Long> workerIds = scorer.selectLowEnd(this.scoreSelector, 1);
            LOG.debug("Final selection for remove-healthy shard, request:{} selection: {}", (Object)ctx, workerIds);
            Preconditions.checkState(((long)workerIds.size() == 1L ? 1 : 0) != 0, (Object)"Should only have one replica to remove!");
            ctx.setWorkerIds(workerIds);
            LOG.debug("Schedule request {}, pending schedule to workerList: {}", (Object)ctx, ctx.getWorkerIds());
            DispatchTask<Boolean> task = this.dispatchTaskForRemoveFromGroup(ctx, ctx.isWaited() ? 10 : 0);
            try {
                this.dispatchExecutors.execute(task);
                cleanOp.cancel();
            }
            catch (Throwable e) {
                LOG.error("Fail to add task {} into dispatchWorkerExecutors", task, (Object)e);
                ctx.done(new StarException(ExceptionCode.SCHEDULE, e.getMessage()));
            }
        }
    }

    private Worker selectOldestWorkerLastSeen(List<Worker> workers) {
        Worker targetWorker = workers.get(0);
        for (Worker worker : workers) {
            if (worker.getLastSeenTime() >= targetWorker.getLastSeenTime()) continue;
            targetWorker = worker;
        }
        return targetWorker;
    }

    private void executeRemoveFromGroupPhase2(ScheduleRequestContext ctx) {
        try (DeferOp ignored = new DeferOp(ctx.getRunnable());){
            if (this.isRunning()) {
                this.executeRemoveFromWorker(ctx);
            } else {
                ctx.done(new StarException(ExceptionCode.SCHEDULE, "Schedule shutdown in progress"));
            }
        }
    }

    private void executeRemoveFromWorker(ScheduleRequestContext ctx) {
        StarException exception = null;
        for (long workerId : ctx.getWorkerIds()) {
            try {
                this.executeRemoveFromWorker(ctx.getServiceId(), Collections.nCopies(1, ctx.getShardId()), workerId);
            }
            catch (StarException e) {
                exception = e;
            }
            catch (Exception e) {
                exception = new StarException(ExceptionCode.SCHEDULE, e.getMessage());
            }
        }
        ctx.done(exception);
    }

    @Override
    public boolean isIdle() {
        if (!this.isRunning()) {
            return true;
        }
        return this.calculateExecutors.getCompletedTaskCount() == this.calculateExecutors.getTaskCount() && this.dispatchExecutors.getCompletedTaskCount() == this.dispatchExecutors.getTaskCount();
    }

    @Override
    public void doStart() {
        this.calculateExecutors = new ScheduledThreadPoolExecutor(2, Utils.namedThreadFactory("scheduler-calc-pool"));
        this.calculateExecutors.setMaximumPoolSize(2);
        this.dispatchExecutors = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(), Utils.namedThreadFactory("scheduler-dispatch-pool"));
        this.adjustPoolExecutors = new ScheduledThreadPoolExecutor(1);
        this.adjustPoolExecutors.setMaximumPoolSize(1);
        this.adjustPoolExecutors.scheduleAtFixedRate(this::adjustScheduleThreadPoolSize, 30L, 300L, TimeUnit.SECONDS);
    }

    void adjustScheduleThreadPoolSize() {
        if (!this.isRunning()) {
            return;
        }
        long totalShards = 0L;
        for (String serviceId : this.serviceManager.getServiceIdSet()) {
            ShardManager shardManager = this.serviceManager.getShardManager(serviceId);
            if (shardManager == null) continue;
            totalShards += (long)shardManager.getShardCount();
        }
        int numOfThreads = ShardSchedulerV2.estimateNumOfCoreThreads(totalShards);
        if (this.calculateExecutors.getCorePoolSize() == numOfThreads) {
            return;
        }
        LOG.info("Total number of shards in memory: {}, adjust scheduler core thread pool size from {} to {}", (Object)totalShards, (Object)this.calculateExecutors.getCorePoolSize(), (Object)numOfThreads);
        if (this.calculateExecutors.getCorePoolSize() < numOfThreads) {
            this.calculateExecutors.setMaximumPoolSize(numOfThreads);
            this.calculateExecutors.setCorePoolSize(numOfThreads);
            this.dispatchExecutors.setMaximumPoolSize(numOfThreads * 2);
            this.dispatchExecutors.setCorePoolSize(numOfThreads * 2);
        } else {
            this.calculateExecutors.setCorePoolSize(numOfThreads);
            this.calculateExecutors.setMaximumPoolSize(numOfThreads);
            this.dispatchExecutors.setCorePoolSize(numOfThreads * 2);
            this.dispatchExecutors.setMaximumPoolSize(numOfThreads * 2);
        }
    }

    static int estimateNumOfCoreThreads(long numOfShards) {
        if (numOfShards < 5000L) {
            return 1;
        }
        if (numOfShards < 10000L) {
            return 2;
        }
        if (numOfShards < 100000L) {
            return 4;
        }
        if (numOfShards < 1000000L) {
            return 8;
        }
        return 16;
    }

    @Override
    public void doStop() {
        Utils.shutdownExecutorService(this.calculateExecutors);
        Utils.shutdownExecutorService(this.dispatchExecutors);
        Utils.shutdownExecutorService(this.adjustPoolExecutors);
    }

    private static class DispatchTask<T>
    extends FutureTask<T>
    implements Comparable<DispatchTask<T>> {
        private final int priority;
        private final String description;

        public DispatchTask(Runnable runnable, T result, int priority, String description) {
            super(runnable, result);
            this.priority = priority;
            this.description = description;
        }

        public DispatchTask(Callable<T> callable, int priority, String description) {
            super(callable);
            this.priority = priority;
            this.description = description;
        }

        @Override
        public int compareTo(DispatchTask o) {
            return Integer.compare(o.priority, this.priority);
        }

        @Override
        public String toString() {
            return this.description;
        }
    }

    private static class ExclusiveLocker {
        protected Set<ScheduleRequestContext> exclusiveContexts = ConcurrentHashMap.newKeySet();
        protected final ReentrantLock exclusiveMapLock = new ReentrantLock();
        protected final Map<Long, Set<Long>> exclusiveShardGroups = new HashMap<Long, Set<Long>>();

        private ExclusiveLocker() {
        }

        public boolean tryLock(ScheduleRequestContext ctx, ShardManager manager) {
            if (this.exclusiveContexts.add(ctx)) {
                if (this.checkAndUpdateExclusiveShardGroups(ctx, manager)) {
                    ctx.setRunnable(() -> this.tryUnlock(ctx));
                    return true;
                }
                this.exclusiveContexts.remove(ctx);
            }
            return false;
        }

        private void tryUnlock(ScheduleRequestContext ctx) {
            this.cleanExclusiveShardGroup(ctx);
            this.exclusiveContexts.remove(ctx);
        }

        private void cleanExclusiveShardGroup(ScheduleRequestContext ctx) {
            Collection<Long> exclusiveGroups = ctx.getExclusiveGroupIds();
            if (exclusiveGroups != null && !exclusiveGroups.isEmpty()) {
                try (LockCloseable ignored = new LockCloseable((Lock)this.exclusiveMapLock);){
                    Collection groupMarker = this.exclusiveShardGroups.get(ctx.getWorkerGroupId());
                    if (groupMarker != null) {
                        groupMarker.removeAll(exclusiveGroups);
                        if (groupMarker.isEmpty()) {
                            this.exclusiveShardGroups.remove(ctx.getWorkerGroupId());
                        }
                    }
                }
            }
        }

        private boolean checkAndUpdateExclusiveShardGroups(ScheduleRequestContext ctx, ShardManager shardManager) {
            Shard shard = shardManager.getShard(ctx.getShardId());
            if (shard == null) {
                return true;
            }
            Set<Long> exclusiveIds = shard.getGroupIds().stream().map(shardManager::getShardGroup).filter(y -> y != null && conflictPolicies.contains(y.getPlacementPolicy())).map(ShardGroup::getGroupId).collect(Collectors.toSet());
            if (exclusiveIds.isEmpty()) {
                return true;
            }
            try (LockCloseable ignored = new LockCloseable((Lock)this.exclusiveMapLock);){
                Set<Long> groupMarker = this.exclusiveShardGroups.get(ctx.getWorkerGroupId());
                if (groupMarker == null) {
                    groupMarker = new HashSet<Long>();
                    this.exclusiveShardGroups.put(ctx.getWorkerGroupId(), groupMarker);
                } else if (exclusiveIds.stream().anyMatch(groupMarker::contains)) {
                    LOG.debug("Has conflict shardgroup running, retry later. {}", (Object)ctx);
                    boolean bl = false;
                    return bl;
                }
                groupMarker.addAll(exclusiveIds);
                ctx.setExclusiveGroupIds(exclusiveIds);
            }
            return true;
        }
    }

    private static class DeferOp
    implements Closeable {
        private final Runnable runnable;
        private boolean done;

        public DeferOp(Runnable runnable) {
            this.runnable = runnable;
            this.done = false;
        }

        public void cancel() {
            this.done = true;
        }

        @Override
        public void close() {
            if (!this.done) {
                this.done = true;
                this.runnable.run();
            }
        }
    }
}

