/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.tsched.streaming.roundrobin;

import edu.iu.dsc.tws.api.compute.exceptions.TaskSchedulerException;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.Vertex;
import edu.iu.dsc.tws.api.compute.schedule.ITaskScheduler;
import edu.iu.dsc.tws.api.compute.schedule.elements.Resource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstanceId;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstancePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.Worker;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerPlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerSchedulePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.tsched.spi.common.TaskSchedulerContext;
import edu.iu.dsc.tws.tsched.spi.taskschedule.TaskInstanceMapCalculation;
import edu.iu.dsc.tws.tsched.utils.TaskAttributes;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.logging.Logger;

public class RoundRobinTaskScheduler
implements ITaskScheduler {
    private static final Logger LOG = Logger.getLogger(RoundRobinTaskScheduler.class.getName());
    private Double instanceRAM;
    private Double instanceDisk;
    private Double instanceCPU;
    private Config config;
    private int workerId;

    public void initialize(Config cfg) {
        this.config = cfg;
        this.instanceRAM = TaskSchedulerContext.taskInstanceRam(this.config);
        this.instanceDisk = TaskSchedulerContext.taskInstanceDisk(this.config);
        this.instanceCPU = TaskSchedulerContext.taskInstanceCpu(this.config);
    }

    public void initialize(Config cfg, int workerid) {
        this.initialize(cfg);
        this.workerId = workerid;
    }

    public TaskSchedulePlan schedule(ComputeGraph computeGraph, WorkerPlan workerPlan) {
        LinkedHashSet<WorkerSchedulePlan> workerSchedulePlans = new LinkedHashSet<WorkerSchedulePlan>();
        LinkedHashSet<Vertex> taskVertexSet = new LinkedHashSet<Vertex>(computeGraph.getTaskVertexSet());
        Map<Integer, List<TaskInstanceId>> roundRobinContainerInstanceMap = this.roundRobinSchedulingAlgorithm(computeGraph, workerPlan.getNumberOfWorkers());
        TaskInstanceMapCalculation instanceMapCalculation = new TaskInstanceMapCalculation(this.instanceRAM, this.instanceCPU, this.instanceDisk);
        Map<Integer, Map<TaskInstanceId, Double>> instancesRamMap = instanceMapCalculation.getInstancesRamMapInContainer(roundRobinContainerInstanceMap, taskVertexSet);
        Map<Integer, Map<TaskInstanceId, Double>> instancesDiskMap = instanceMapCalculation.getInstancesDiskMapInContainer(roundRobinContainerInstanceMap, taskVertexSet);
        Map<Integer, Map<TaskInstanceId, Double>> instancesCPUMap = instanceMapCalculation.getInstancesCPUMapInContainer(roundRobinContainerInstanceMap, taskVertexSet);
        for (int containerId : roundRobinContainerInstanceMap.keySet()) {
            double containerRAMValue = TaskSchedulerContext.containerRamPadding(this.config);
            double containerDiskValue = TaskSchedulerContext.containerDiskPadding(this.config);
            double containerCpuValue = TaskSchedulerContext.containerCpuPadding(this.config);
            List<TaskInstanceId> taskTaskInstanceIds = roundRobinContainerInstanceMap.get(containerId);
            HashMap<TaskInstanceId, TaskInstancePlan> taskInstancePlanMap = new HashMap<TaskInstanceId, TaskInstancePlan>();
            for (TaskInstanceId id : taskTaskInstanceIds) {
                double instanceRAMValue = instancesRamMap.get(containerId).get(id);
                double instanceDiskValue = instancesDiskMap.get(containerId).get(id);
                double instanceCPUValue = instancesCPUMap.get(containerId).get(id);
                Resource instanceResource = new Resource(Double.valueOf(instanceRAMValue), Double.valueOf(instanceDiskValue), Double.valueOf(instanceCPUValue));
                taskInstancePlanMap.put(id, new TaskInstancePlan(id.getTaskName(), id.getTaskId(), id.getTaskIndex(), instanceResource));
                containerRAMValue += instanceRAMValue;
                containerDiskValue += instanceDiskValue;
                containerCpuValue += instanceDiskValue;
            }
            Worker worker = workerPlan.getWorker(containerId);
            Resource containerResource = worker != null && worker.getCpu() > 0 && worker.getDisk() > 0 && worker.getRam() > 0 ? new Resource(Double.valueOf(worker.getRam()), Double.valueOf(worker.getDisk()), Double.valueOf(worker.getCpu())) : new Resource(Double.valueOf(containerRAMValue), Double.valueOf(containerDiskValue), Double.valueOf(containerCpuValue));
            WorkerSchedulePlan taskWorkerSchedulePlan = new WorkerSchedulePlan(containerId, new LinkedHashSet(taskInstancePlanMap.values()), containerResource);
            workerSchedulePlans.add(taskWorkerSchedulePlan);
        }
        return new TaskSchedulePlan(0, workerSchedulePlans);
    }

    private Map<Integer, List<TaskInstanceId>> roundRobinSchedulingAlgorithm(ComputeGraph graph, int numberOfContainers) throws TaskSchedulerException {
        LinkedHashMap<Integer, List<TaskInstanceId>> roundrobinAllocation = new LinkedHashMap<Integer, List<TaskInstanceId>>();
        for (int i = 0; i < numberOfContainers; ++i) {
            roundrobinAllocation.put(i, new ArrayList());
        }
        LinkedHashSet taskVertexSet = new LinkedHashSet(graph.getTaskVertexSet());
        TreeSet<Vertex> orderedTaskSet = new TreeSet<Vertex>(new VertexComparator());
        orderedTaskSet.addAll(taskVertexSet);
        TaskAttributes taskAttributes = new TaskAttributes();
        int globalTaskIndex = 0;
        for (Vertex vertex : taskVertexSet) {
            int totalTaskInstances = !graph.getNodeConstraints().isEmpty() ? taskAttributes.getTotalNumberOfInstances(vertex, (Map<String, Map<String, String>>)graph.getNodeConstraints()) : taskAttributes.getTotalNumberOfInstances(vertex);
            if (!graph.getNodeConstraints().isEmpty()) {
                int instancesPerWorker = taskAttributes.getInstancesPerWorker(graph.getGraphConstraints());
                int maxTaskInstancesPerContainer = 0;
                for (int i = 0; i < totalTaskInstances; ++i) {
                    int containerIndex = i % numberOfContainers;
                    if (maxTaskInstancesPerContainer < instancesPerWorker) {
                        ((List)roundrobinAllocation.get(containerIndex)).add(new TaskInstanceId(vertex.getName(), globalTaskIndex, i));
                        ++maxTaskInstancesPerContainer;
                        continue;
                    }
                    throw new TaskSchedulerException("Task Scheduling couldn't be possible for the presentconfiguration, please check the number of workers, maximum instances per worker");
                }
            } else {
                String task = vertex.getName();
                for (int i = 0; i < totalTaskInstances; ++i) {
                    int containerIndex = i % numberOfContainers;
                    ((List)roundrobinAllocation.get(containerIndex)).add(new TaskInstanceId(task, globalTaskIndex, i));
                }
            }
            ++globalTaskIndex;
        }
        return roundrobinAllocation;
    }

    private static class VertexComparator
    implements Comparator<Vertex> {
        private VertexComparator() {
        }

        @Override
        public int compare(Vertex o1, Vertex o2) {
            return o1.getName().compareTo(o2.getName());
        }
    }
}

