/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esper.dataflow.core;

import com.espertech.esper.client.annotation.AuditEnum;
import com.espertech.esper.client.dataflow.EPDataFlowCancellationException;
import com.espertech.esper.client.dataflow.EPDataFlowExecutionException;
import com.espertech.esper.client.dataflow.EPDataFlowInstance;
import com.espertech.esper.client.dataflow.EPDataFlowInstanceCaptive;
import com.espertech.esper.client.dataflow.EPDataFlowInstanceStatistics;
import com.espertech.esper.client.dataflow.EPDataFlowState;
import com.espertech.esper.collection.Pair;
import com.espertech.esper.dataflow.interfaces.DataFlowOpCloseContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpLifecycle;
import com.espertech.esper.dataflow.interfaces.DataFlowOpOpenContext;
import com.espertech.esper.dataflow.ops.Emitter;
import com.espertech.esper.dataflow.runnables.CompletionListener;
import com.espertech.esper.dataflow.runnables.GraphSourceRunnable;
import com.espertech.esper.util.AuditPath;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class EPDataFlowInstanceImpl
implements EPDataFlowInstance,
CompletionListener {
    private static final Log log = LogFactory.getLog(EPDataFlowInstanceImpl.class);
    private final String engineURI;
    private final String statementName;
    private final boolean audit;
    private final String dataFlowName;
    private final Object userObject;
    private final String instanceId;
    private volatile EPDataFlowState state;
    private final List<GraphSourceRunnable> sourceRunnables;
    private final Map<Integer, Pair<Object, Boolean>> operators;
    private final Set<Integer> operatorBuildOrder;
    private final EPDataFlowInstanceStatistics statisticsProvider;
    private final Map<String, Object> parameters;
    private List<CountDownLatch> joinedThreadLatches;
    private List<Thread> threads;
    private Thread runCurrentThread;

    public EPDataFlowInstanceImpl(String engineURI, String statementName, boolean audit, String dataFlowName, Object userObject, String instanceId, EPDataFlowState state, List<GraphSourceRunnable> sourceRunnables, Map<Integer, Object> operators, Set<Integer> operatorBuildOrder, EPDataFlowInstanceStatistics statisticsProvider, Map<String, Object> parameters) {
        this.engineURI = engineURI;
        this.statementName = statementName;
        this.audit = audit;
        this.dataFlowName = dataFlowName;
        this.userObject = userObject;
        this.instanceId = instanceId;
        this.sourceRunnables = sourceRunnables;
        this.operators = new TreeMap<Integer, Pair<Object, Boolean>>();
        for (Map.Entry<Integer, Object> entry : operators.entrySet()) {
            this.operators.put(entry.getKey(), new Pair<Object, Boolean>(entry.getValue(), false));
        }
        this.operatorBuildOrder = operatorBuildOrder;
        this.statisticsProvider = statisticsProvider;
        this.setState(state);
        this.parameters = parameters;
    }

    @Override
    public String getDataFlowName() {
        return this.dataFlowName;
    }

    @Override
    public EPDataFlowState getState() {
        return this.state;
    }

    @Override
    public Object getUserObject() {
        return this.userObject;
    }

    @Override
    public String getInstanceId() {
        return this.instanceId;
    }

    @Override
    public Map<String, Object> getParameters() {
        return this.parameters;
    }

    @Override
    public synchronized EPDataFlowInstanceCaptive startCaptive() {
        this.checkExecCompleteState();
        this.checkExecCancelledState();
        this.checkExecRunningState();
        this.setState(EPDataFlowState.RUNNING);
        this.callOperatorOpen();
        HashMap<String, Emitter> emitters = new HashMap<String, Emitter>();
        for (Pair<Object, Boolean> operatorStatePair : this.operators.values()) {
            if (!(operatorStatePair.getFirst() instanceof Emitter)) continue;
            Emitter emitter = (Emitter)operatorStatePair.getFirst();
            emitters.put(emitter.getName(), emitter);
        }
        return new EPDataFlowInstanceCaptive(emitters, this.sourceRunnables);
    }

    @Override
    public synchronized void run() {
        this.checkExecCompleteState();
        this.checkExecCancelledState();
        this.checkExecRunningState();
        if (this.sourceRunnables.size() != 1) {
            throw new UnsupportedOperationException("The data flow '" + this.dataFlowName + "' has zero or multiple sources and requires the use of the start method instead");
        }
        this.callOperatorOpen();
        GraphSourceRunnable sourceRunnable = this.sourceRunnables.get(0);
        this.setState(EPDataFlowState.RUNNING);
        this.runCurrentThread = Thread.currentThread();
        try {
            sourceRunnable.runSync();
        }
        catch (InterruptedException ex) {
            this.callOperatorClose();
            this.setState(EPDataFlowState.CANCELLED);
            throw new EPDataFlowCancellationException("Data flow '" + this.dataFlowName + "' execution was cancelled", this.dataFlowName);
        }
        catch (Throwable t) {
            this.callOperatorClose();
            this.setState(EPDataFlowState.COMPLETE);
            throw new EPDataFlowExecutionException("Exception encountered running data flow '" + this.dataFlowName + "': " + t.getMessage(), t, this.dataFlowName);
        }
        this.callOperatorClose();
        if (this.state != EPDataFlowState.CANCELLED) {
            this.setState(EPDataFlowState.COMPLETE);
        }
    }

    @Override
    public synchronized void start() {
        this.checkExecCompleteState();
        this.checkExecCancelledState();
        this.checkExecRunningState();
        this.callOperatorOpen();
        final AtomicInteger countdown = new AtomicInteger(this.sourceRunnables.size());
        this.threads = new ArrayList<Thread>();
        for (int i = 0; i < this.sourceRunnables.size(); ++i) {
            GraphSourceRunnable runnable = this.sourceRunnables.get(i);
            String threadName = "esper." + this.dataFlowName + "-" + i;
            Thread thread = new Thread((Runnable)runnable, threadName);
            thread.setContextClassLoader(Thread.currentThread().getContextClassLoader());
            thread.setDaemon(true);
            runnable.addCompletionListener(new CompletionListener(){

                @Override
                public void completed() {
                    int remaining = countdown.decrementAndGet();
                    if (remaining == 0) {
                        EPDataFlowInstanceImpl.this.completed();
                    }
                }
            });
            this.threads.add(thread);
            thread.start();
        }
        this.setState(EPDataFlowState.RUNNING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void join() throws InterruptedException {
        if (this.state == EPDataFlowState.INSTANTIATED) {
            throw new IllegalStateException("Data flow '" + this.dataFlowName + "' instance has not been executed, please use join after start or run");
        }
        if (this.state == EPDataFlowState.CANCELLED) {
            throw new IllegalStateException("Data flow '" + this.dataFlowName + "' instance has been cancelled and cannot be joined");
        }
        if (this.threads != null) {
            for (Thread thread : this.threads) {
                thread.join();
            }
        } else {
            CountDownLatch latch = new CountDownLatch(1);
            EPDataFlowInstanceImpl ePDataFlowInstanceImpl = this;
            synchronized (ePDataFlowInstanceImpl) {
                if (this.joinedThreadLatches == null) {
                    this.joinedThreadLatches = new ArrayList<CountDownLatch>();
                }
                this.joinedThreadLatches.add(latch);
            }
            if (this.state != EPDataFlowState.COMPLETE) {
                latch.await();
            }
        }
    }

    @Override
    public void cancel() {
        if (this.state == EPDataFlowState.COMPLETE || this.state == EPDataFlowState.CANCELLED) {
            return;
        }
        if (this.state == EPDataFlowState.INSTANTIATED) {
            this.setState(EPDataFlowState.CANCELLED);
            this.sourceRunnables.clear();
            this.callOperatorClose();
            return;
        }
        if (this.threads != null) {
            for (GraphSourceRunnable runnable : this.sourceRunnables) {
                runnable.shutdown();
            }
            for (Thread thread : this.threads) {
                if (!thread.isAlive() || thread.isInterrupted()) continue;
                thread.interrupt();
            }
        } else {
            if (this.runCurrentThread != null) {
                this.runCurrentThread.interrupt();
            }
            this.runCurrentThread = null;
        }
        this.callOperatorClose();
        this.setState(EPDataFlowState.CANCELLED);
        this.sourceRunnables.clear();
    }

    @Override
    public synchronized void completed() {
        if (this.state != EPDataFlowState.CANCELLED) {
            this.setState(EPDataFlowState.COMPLETE);
        }
        this.callOperatorClose();
        if (this.joinedThreadLatches != null) {
            for (CountDownLatch joinedThread : this.joinedThreadLatches) {
                joinedThread.countDown();
            }
        }
    }

    @Override
    public EPDataFlowInstanceStatistics getStatistics() {
        return this.statisticsProvider;
    }

    private void checkExecCompleteState() {
        if (this.state == EPDataFlowState.COMPLETE) {
            throw new IllegalStateException("Data flow '" + this.dataFlowName + "' instance has already completed, please use instantiate to run the data flow again");
        }
    }

    private void checkExecRunningState() {
        if (this.state == EPDataFlowState.RUNNING) {
            throw new IllegalStateException("Data flow '" + this.dataFlowName + "' instance is already running");
        }
    }

    private void checkExecCancelledState() {
        if (this.state == EPDataFlowState.CANCELLED) {
            throw new IllegalStateException("Data flow '" + this.dataFlowName + "' instance has been cancelled and cannot be run or started");
        }
    }

    private synchronized void callOperatorClose() {
        for (Integer opNum : this.operatorBuildOrder) {
            Pair<Object, Boolean> operatorStatePair = this.operators.get(opNum);
            if (!(operatorStatePair.getFirst() instanceof DataFlowOpLifecycle) || operatorStatePair.getSecond().booleanValue()) continue;
            try {
                DataFlowOpLifecycle lf = (DataFlowOpLifecycle)operatorStatePair.getFirst();
                lf.close(new DataFlowOpCloseContext());
            }
            catch (RuntimeException ex) {
                log.error((Object)("Exception encountered closing data flow '" + this.dataFlowName + "': " + ex.getMessage()), (Throwable)ex);
            }
            operatorStatePair.setSecond(true);
        }
    }

    private void callOperatorOpen() {
        for (Integer opNum : this.operatorBuildOrder) {
            Pair<Object, Boolean> operatorStatePair = this.operators.get(opNum);
            if (!(operatorStatePair.getFirst() instanceof DataFlowOpLifecycle)) continue;
            try {
                DataFlowOpLifecycle lf = (DataFlowOpLifecycle)operatorStatePair.getFirst();
                lf.open(new DataFlowOpOpenContext());
            }
            catch (RuntimeException ex) {
                throw new EPDataFlowExecutionException("Exception encountered opening data flow 'FlowOne' in operator " + operatorStatePair.getFirst().getClass().getSimpleName() + ": " + ex.getMessage(), ex, this.dataFlowName);
            }
        }
    }

    private void setState(EPDataFlowState newState) {
        if (this.audit) {
            AuditPath.auditLog(this.engineURI, this.statementName, AuditEnum.DATAFLOW_TRANSITION, "dataflow " + this.dataFlowName + " instance " + this.instanceId + " from state " + (Object)((Object)this.state) + " to state " + (Object)((Object)newState));
        }
        this.state = newState;
    }
}

