/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esper.dataflow.runnables;

import com.espertech.esper.client.annotation.AuditEnum;
import com.espertech.esper.client.dataflow.EPDataFlowExceptionContext;
import com.espertech.esper.client.dataflow.EPDataFlowExceptionHandler;
import com.espertech.esper.client.dataflow.EPDataFlowSignal;
import com.espertech.esper.client.dataflow.EPDataFlowSignalFinalMarker;
import com.espertech.esper.dataflow.interfaces.DataFlowSourceOperator;
import com.espertech.esper.dataflow.runnables.BaseRunnable;
import com.espertech.esper.dataflow.runnables.CompletionListener;
import com.espertech.esper.dataflow.util.DataFlowSignalListener;
import com.espertech.esper.util.AuditPath;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GraphSourceRunnable
implements BaseRunnable,
DataFlowSignalListener {
    private static final Logger log = LoggerFactory.getLogger(GraphSourceRunnable.class);
    private final String engineURI;
    private final String statementName;
    private final DataFlowSourceOperator graphSource;
    private final String dataFlowName;
    private final String operatorName;
    private final int operatorNumber;
    private final String operatorPrettyPrint;
    private final EPDataFlowExceptionHandler optionalExceptionHandler;
    private final boolean audit;
    private boolean shutdown;
    private List<CompletionListener> completionListeners;

    public GraphSourceRunnable(String engineURI, String statementName, DataFlowSourceOperator graphSource, String dataFlowName, String operatorName, int operatorNumber, String operatorPrettyPrint, EPDataFlowExceptionHandler optionalExceptionHandler, boolean audit) {
        this.engineURI = engineURI;
        this.statementName = statementName;
        this.graphSource = graphSource;
        this.dataFlowName = dataFlowName;
        this.operatorName = operatorName;
        this.operatorNumber = operatorNumber;
        this.operatorPrettyPrint = operatorPrettyPrint;
        this.optionalExceptionHandler = optionalExceptionHandler;
        this.audit = audit;
    }

    @Override
    public void processSignal(EPDataFlowSignal signal) {
        if (signal instanceof EPDataFlowSignalFinalMarker) {
            this.shutdown = true;
        }
    }

    @Override
    public void run() {
        try {
            this.runLoop();
        }
        catch (InterruptedException ex) {
            log.debug("Interruped runnable: " + ex.getMessage(), (Throwable)ex);
        }
        catch (RuntimeException ex) {
            log.error("Exception encountered: " + ex.getMessage(), (Throwable)ex);
            this.handleException(ex);
        }
        this.invokeCompletionListeners();
    }

    public void runSync() throws InterruptedException {
        try {
            this.runLoop();
        }
        catch (InterruptedException ex) {
            log.debug("Interruped runnable: " + ex.getMessage(), (Throwable)ex);
            throw ex;
        }
        catch (RuntimeException ex) {
            log.error("Exception encountered: " + ex.getMessage(), (Throwable)ex);
            this.handleException(ex);
            throw ex;
        }
    }

    private void handleException(RuntimeException ex) {
        if (this.optionalExceptionHandler == null) {
            return;
        }
        this.optionalExceptionHandler.handle(new EPDataFlowExceptionContext(this.dataFlowName, this.operatorName, this.operatorNumber, this.operatorPrettyPrint, ex));
    }

    private void runLoop() throws InterruptedException {
        do {
            if (this.audit) {
                AuditPath.auditLog(this.engineURI, this.statementName, AuditEnum.DATAFLOW_SOURCE, "dataflow " + this.dataFlowName + " operator " + this.operatorName + "(" + this.operatorNumber + ") invoking source.next()");
            }
            this.graphSource.next();
        } while (!this.shutdown);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void invokeCompletionListeners() {
        GraphSourceRunnable graphSourceRunnable = this;
        synchronized (graphSourceRunnable) {
            if (this.completionListeners != null) {
                for (CompletionListener listener : this.completionListeners) {
                    listener.completed();
                }
            }
        }
    }

    public synchronized void addCompletionListener(CompletionListener completionListener) {
        if (this.completionListeners == null) {
            this.completionListeners = new ArrayList<CompletionListener>();
        }
        this.completionListeners.add(completionListener);
    }

    public void next() throws InterruptedException {
        this.graphSource.next();
    }

    @Override
    public void shutdown() {
        this.shutdown = true;
    }

    public boolean isShutdown() {
        return this.shutdown;
    }
}

