/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite.dag.scheduler;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DagScheduler {
    private static Logger log = LoggerFactory.getLogger(DagScheduler.class);
    private WorkflowDag workflowDag;
    private ExecutionContext executionContext;

    public DagScheduler(WorkflowDag workflowDag, HoodieTestSuiteWriter hoodieTestSuiteWriter, DeltaGenerator deltaGenerator) {
        this.workflowDag = workflowDag;
        this.executionContext = new ExecutionContext(null, hoodieTestSuiteWriter, deltaGenerator);
    }

    public void schedule() throws Exception {
        ExecutorService service = Executors.newFixedThreadPool(2);
        try {
            this.execute(service, this.workflowDag.getNodeList());
            service.shutdown();
        }
        finally {
            if (!service.isShutdown()) {
                log.info("Forcing shutdown of executor service, this might kill running tasks");
                service.shutdownNow();
            }
        }
    }

    private void execute(ExecutorService service, List<DagNode> nodes) throws Exception {
        PriorityQueue<DagNode> queue = new PriorityQueue<DagNode>(nodes);
        log.info("Running workloads");
        do {
            ArrayList futures = new ArrayList();
            HashSet childNodes = new HashSet();
            while (queue.size() > 0) {
                DagNode nodeToExecute = (DagNode)queue.poll();
                futures.add(service.submit(() -> this.executeNode(nodeToExecute)));
                if (nodeToExecute.getChildNodes().size() <= 0) continue;
                childNodes.addAll(nodeToExecute.getChildNodes());
            }
            queue.addAll(childNodes);
            childNodes.clear();
            for (Future future : futures) {
                future.get(1L, TimeUnit.HOURS);
            }
        } while (queue.size() > 0);
        log.info("Finished workloads");
    }

    private void executeNode(DagNode node) {
        if (node.isCompleted()) {
            throw new RuntimeException("DagNode already completed! Cannot re-execute");
        }
        try {
            node.execute(this.executionContext);
            node.setCompleted(true);
            log.info("Finished executing {}", (Object)node.getName());
        }
        catch (Exception e) {
            log.error("Exception executing node");
            throw new HoodieException((Throwable)e);
        }
    }
}

