/*
 * Decompiled with CFR 0.152.
 */
package streams.runtime;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Process;
import stream.Processor;
import stream.ProcessorList;
import stream.io.Sink;
import stream.io.Source;
import stream.runtime.Monitor;
import stream.runtime.ProcessListener;
import streams.application.ComputeGraph;
import streams.runtime.Hook;

public class Supervisor
implements ProcessListener,
Hook {
    static Logger log = LoggerFactory.getLogger(Supervisor.class);
    List<Process> runningProcesses = new ArrayList<Process>();
    final AtomicInteger running = new AtomicInteger(0);
    final AtomicInteger errors = new AtomicInteger(0);
    final AtomicInteger finished = new AtomicInteger(0);
    ComputeGraph dependencies;
    Map<Process, Set<Sink>> processOutlets = new HashMap<Process, Set<Sink>>();
    final Object lock = new Object();

    public Supervisor(ComputeGraph graph) {
        this.dependencies = graph;
        log.debug("Creating supervisor for graph {}", (Object)graph);
        graph.printShutdownStrategy();
        Set srcs = graph.getSources();
        for (Object src : srcs) {
            if (!(src instanceof Source)) continue;
            Set ts = graph.getTargets(src);
            log.debug("  Source '{}'  is read from {} targets: {}", new Object[]{src, ts.size(), ts});
        }
        for (Process p : graph.processes().values()) {
            Set<Sink> outlets = this.collectSinks(p);
            log.debug("Process '{}' has {} outlets: {}", new Object[]{p, outlets.size(), outlets});
        }
    }

    @Override
    public synchronized void processStarted(Process p) {
        if (p instanceof Monitor) {
            log.debug("Monitor #{} started", (Object)p);
            return;
        }
        log.debug("Process  #{}  started.", (Object)p);
        int run2 = this.running.incrementAndGet();
        this.runningProcesses.add(p);
        Set<Sink> sinks = this.collectSinks(p);
        log.debug("   process #{} is writing to {} sinks", (Object)p, (Object)sinks.size());
        this.processOutlets.put(p, sinks);
        log.debug("{} processes running.", (Object)run2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void processError(Process p, Exception e) {
        log.debug("Process {} finished with error: {}", (Object)p, (Object)e.getMessage());
        this.errors.incrementAndGet();
        Object object = this.lock;
        synchronized (object) {
            this.lock.notify();
        }
        this.processFinished(p);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void processFinished(Process p) {
        log.debug("Process {} finished normally...", (Object)p);
        int run2 = this.running.decrementAndGet();
        this.finished.incrementAndGet();
        this.runningProcesses.remove(p);
        log.debug("Process  #{}  finished.", (Object)p);
        Set<Sink> outs = this.processOutlets.get(p);
        if (outs == null) {
            outs = new HashSet<Sink>();
        }
        log.debug("   process has {} outgoing targets: {}", (Object)outs.size(), outs);
        Set<Sink> outlets = this.processOutlets.remove(p);
        if (outlets != null) {
            for (Sink sink : outlets) {
                int refCount = 0;
                for (Process pr : this.processOutlets.keySet()) {
                    Set<Sink> prOuts;
                    if (pr == null || !(prOuts = this.processOutlets.get(pr)).contains(sink)) continue;
                    ++refCount;
                }
                if (refCount == 0) {
                    log.debug("Reference count of {} is 0, closing sink!", (Object)sink);
                    try {
                        sink.close();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    continue;
                }
                log.debug("Reference count for {} is: {}", (Object)sink, (Object)refCount);
            }
        }
        if (log.isTraceEnabled()) {
            this.printTargets(p, 0);
        }
        this.dependencies.remove((Object)p);
        Set srcs = this.dependencies.getRootSources();
        log.debug("{} root sources remaining:   {}", (Object)srcs.size(), (Object)srcs);
        Object object = this.lock;
        synchronized (object) {
            this.lock.notify();
        }
        log.debug("{} processes running.", (Object)run2);
    }

    public void printTargets(Object src, int depth) {
        String prefix = "";
        for (int i = 0; i < depth; ++i) {
            prefix = prefix + "  ";
        }
        Set outs = this.dependencies.getTargets(src);
        for (Object out : outs) {
            log.debug(prefix + " " + out);
            if (out instanceof Sink) break;
            this.printTargets(out, depth + 1);
        }
    }

    public int processesDone() {
        return this.finished.get() + this.errors.get();
    }

    public int processesRunning() {
        log.debug("Active processes: {}", this.runningProcesses);
        return this.running.get();
    }

    public Set<Sink> collectSinks(Object p) {
        HashSet<Sink> sinks = new HashSet<Sink>();
        Set outs = this.dependencies.getTargets(p);
        for (Object out : outs) {
            if (out instanceof Sink) {
                log.debug("Found sink '{}' referenced by {}", out, p);
                sinks.add((Sink)out);
                continue;
            }
            sinks.addAll(this.collectSinks(out));
        }
        if (p instanceof Process) {
            log.debug("Checking sinks referenced by elements of process '{}'", p);
            for (Processor pr : ((Process)p).getProcessors()) {
                Set<Sink> childSinks = this.collectSinks(pr);
                if (pr instanceof ProcessorList) {
                    ProcessorList pl = (ProcessorList)pr;
                    for (Processor proc : pl.getProcessors()) {
                        Set<Sink> found = this.collectSinks(proc);
                        log.debug("  Found {} sinks referenced by child '{}': {}", new Object[]{found.size(), proc, found});
                        childSinks.addAll(found);
                    }
                } else {
                    Set<Sink> found = this.collectSinks(pr);
                    log.debug("  Found {} sinks referenced by child '{}': {}", new Object[]{found.size(), pr, found});
                    childSinks.addAll(found);
                }
                sinks.addAll(childSinks);
            }
        }
        return sinks;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForProcesses() {
        if (this.running.get() == 0) {
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            try {
                this.lock.wait();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void signal(int flags) {
        log.debug("Shutdown signal received: '{}'", (Object)flags);
        log.debug("Closing root sources: {}", (Object)this.dependencies.getRootSources());
        Object object = this.lock;
        synchronized (object) {
            this.lock.notify();
        }
        final Set roots = this.dependencies.getRootSources();
        if (roots.isEmpty()) {
            return;
        }
        Thread t = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Iterator it = roots.iterator();
                while (it.hasNext()) {
                    Source src = (Source)it.next();
                    Set consumers = Supervisor.this.dependencies.getTargets((Object)src);
                    log.info("The following consumers are attached to the root {}:  {}", (Object)src, (Object)consumers);
                    try {
                        src.close();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    Iterator iterator = consumers.iterator();
                    while (iterator.hasNext()) {
                        Object c;
                        Object e = c = iterator.next();
                        synchronized (e) {
                            try {
                                log.info("Notifying consumer {}", c);
                                c.notify();
                            }
                            catch (Exception e2) {
                                e2.printStackTrace();
                            }
                        }
                    }
                    it.remove();
                }
            }
        };
        t.start();
    }
}

