/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.scheduler;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RoundRobinScheduler
implements IScheduler {
    private static final Logger log = LoggerFactory.getLogger(RoundRobinScheduler.class);

    @Override
    public List<Function.Assignment> schedule(List<Function.Instance> unassignedFunctionInstances, List<Function.Assignment> currentAssignments, Set<String> workers) {
        HashMap<String, List<Function.Instance>> workerIdToAssignment = new HashMap<String, List<Function.Instance>>();
        ArrayList<Function.Assignment> newAssignments = Lists.newArrayList();
        for (String workerId : workers) {
            workerIdToAssignment.put(workerId, new LinkedList());
        }
        if (currentAssignments != null) {
            for (Function.Assignment existingAssignment : currentAssignments) {
                ((List)workerIdToAssignment.get(existingAssignment.getWorkerId())).add(existingAssignment.getInstance());
            }
        }
        for (Function.Instance unassignedFunctionInstance : unassignedFunctionInstances) {
            String workerId = this.findNextWorker(workerIdToAssignment);
            Function.Assignment newAssignment = Function.Assignment.newBuilder().setInstance(unassignedFunctionInstance).setWorkerId(workerId).build();
            ((List)workerIdToAssignment.get(workerId)).add(newAssignment.getInstance());
            newAssignments.add(newAssignment);
        }
        return newAssignments;
    }

    private String findNextWorker(Map<String, List<Function.Instance>> workerIdToAssignment) {
        String targetWorkerId = null;
        int least = Integer.MAX_VALUE;
        for (Map.Entry<String, List<Function.Instance>> entry : workerIdToAssignment.entrySet()) {
            String workerId = entry.getKey();
            List<Function.Instance> workerAssignments = entry.getValue();
            if (workerAssignments.size() >= least) continue;
            targetWorkerId = workerId;
            least = workerAssignments.size();
        }
        return targetWorkerId;
    }

    @Override
    public List<Function.Assignment> rebalance(List<Function.Assignment> currentAssignments, Set<String> workers) {
        HashMap<String, List<Function.Instance>> workerToAssignmentMap = new HashMap<String, List<Function.Instance>>();
        workers.forEach(workerId -> workerToAssignmentMap.put((String)workerId, new LinkedList()));
        currentAssignments.forEach(assignment -> workerToAssignmentMap.computeIfAbsent(assignment.getWorkerId(), s -> new LinkedList()).add(assignment.getInstance()));
        LinkedList<Function.Assignment> newAssignments = new LinkedList<Function.Assignment>();
        int iterations = 0;
        while (true) {
            ++iterations;
            Map.Entry<String, List<Function.Instance>> mostAssignmentsWorker = this.findWorkerWithMostAssignments(workerToAssignmentMap);
            Map.Entry<String, List<Function.Instance>> leastAssignmentsWorker = this.findWorkerWithLeastAssignments(workerToAssignmentMap);
            if (mostAssignmentsWorker.getValue().size() == leastAssignmentsWorker.getValue().size() || mostAssignmentsWorker.getValue().size() == leastAssignmentsWorker.getValue().size() + 1) break;
            String mostAssignmentsWorkerId = mostAssignmentsWorker.getKey();
            String leastAssignmentsWorkerId = leastAssignmentsWorker.getKey();
            Queue src = (Queue)workerToAssignmentMap.get(mostAssignmentsWorkerId);
            Queue dest = (Queue)workerToAssignmentMap.get(leastAssignmentsWorkerId);
            Function.Instance instance = (Function.Instance)src.poll();
            Function.Assignment newAssignment = Function.Assignment.newBuilder().setInstance(instance).setWorkerId(leastAssignmentsWorkerId).build();
            newAssignments.add(newAssignment);
            dest.add(instance);
        }
        log.info("Rebalance - iterations: {}", (Object)iterations);
        return newAssignments;
    }

    private Map.Entry<String, List<Function.Instance>> findWorkerWithLeastAssignments(Map<String, List<Function.Instance>> workerToAssignmentMap) {
        return workerToAssignmentMap.entrySet().stream().min(Comparator.comparingInt(o -> ((List)o.getValue()).size())).get();
    }

    private Map.Entry<String, List<Function.Instance>> findWorkerWithMostAssignments(Map<String, List<Function.Instance>> workerToAssignmentMap) {
        return workerToAssignmentMap.entrySet().stream().max(Comparator.comparingInt(o -> ((List)o.getValue()).size())).get();
    }
}

