/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite.dag.scheduler;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DagScheduler {
    private static Logger log = LoggerFactory.getLogger(DagScheduler.class);
    private WorkflowDag workflowDag;
    private ExecutionContext executionContext;

    public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext jsc) {
        this.workflowDag = workflowDag;
        this.executionContext = new ExecutionContext(jsc, writerContext);
    }

    public void schedule() throws Exception {
        ExecutorService service = Executors.newFixedThreadPool(2);
        try {
            this.execute(service, this.workflowDag);
            service.shutdown();
        }
        finally {
            if (!service.isShutdown()) {
                log.info("Forcing shutdown of executor service, this might kill running tasks");
                service.shutdownNow();
            }
        }
    }

    private void execute(ExecutorService service, WorkflowDag workflowDag) throws Exception {
        log.info("Running workloads");
        List nodes = workflowDag.getNodeList();
        int curRound = 1;
        do {
            log.warn("===================================================================");
            log.warn("Running workloads for round num " + curRound);
            log.warn("===================================================================");
            PriorityQueue<DagNode<Object>> queue = new PriorityQueue<DagNode<Object>>();
            for (DagNode dagNode : nodes) {
                queue.add(dagNode.clone());
            }
            do {
                ArrayList futures = new ArrayList();
                HashSet childNodes = new HashSet();
                while (queue.size() > 0) {
                    DagNode nodeToExecute = (DagNode)queue.poll();
                    log.warn("Executing node \"" + nodeToExecute.getConfig().getOtherConfigs().get("config") + "\" :: " + nodeToExecute.getConfig());
                    int n = curRound;
                    futures.add(service.submit(() -> this.executeNode(nodeToExecute, finalCurRound)));
                    if (nodeToExecute.getChildNodes().size() <= 0) continue;
                    childNodes.addAll(nodeToExecute.getChildNodes());
                }
                queue.addAll(childNodes);
                childNodes.clear();
                for (Future future : futures) {
                    future.get(1L, TimeUnit.HOURS);
                }
            } while (queue.size() > 0);
            log.info("Finished workloads for round num " + curRound);
            if (curRound >= workflowDag.getRounds()) continue;
            new DelayNode(workflowDag.getIntermittentDelayMins()).execute(this.executionContext, curRound);
        } while (curRound++ < workflowDag.getRounds());
        log.info("Finished workloads");
    }

    protected void executeNode(DagNode node, int curRound) {
        if (node.isCompleted()) {
            throw new RuntimeException("DagNode already completed! Cannot re-execute");
        }
        try {
            for (int repeatCount = node.getConfig().getRepeatCount(); repeatCount > 0; --repeatCount) {
                node.execute(this.executionContext, curRound);
                log.info("Finished executing {}", (Object)node.getName());
            }
            node.setCompleted(true);
        }
        catch (Exception e) {
            log.error("Exception executing node", (Throwable)e);
            throw new HoodieException((Throwable)e);
        }
    }
}

