/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esper.dataflow.ops;

import com.espertech.esper.client.EPException;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.StatementAwareUpdateListener;
import com.espertech.esper.client.dataflow.EPDataFlowEPStatementFilter;
import com.espertech.esper.client.dataflow.EPDataFlowIRStreamCollector;
import com.espertech.esper.client.dataflow.EPDataFlowIRStreamCollectorContext;
import com.espertech.esper.client.dataflow.EPDataFlowSignal;
import com.espertech.esper.core.service.StatementLifecycleEvent;
import com.espertech.esper.core.service.StatementLifecycleObserver;
import com.espertech.esper.core.service.StatementLifecycleSvc;
import com.espertech.esper.dataflow.annotations.DataFlowContext;
import com.espertech.esper.dataflow.annotations.DataFlowOpParameter;
import com.espertech.esper.dataflow.annotations.DataFlowOperator;
import com.espertech.esper.dataflow.interfaces.DataFlowOpCloseContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpInitializateContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpInitializeResult;
import com.espertech.esper.dataflow.interfaces.DataFlowOpLifecycle;
import com.espertech.esper.dataflow.interfaces.DataFlowOpOpenContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpOutputPort;
import com.espertech.esper.dataflow.interfaces.DataFlowSourceOperator;
import com.espertech.esper.dataflow.interfaces.EPDataFlowEmitter;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataFlowOperator
public class EPStatementSource
implements DataFlowSourceOperator,
DataFlowOpLifecycle,
StatementLifecycleObserver {
    private static final Logger log = LoggerFactory.getLogger(EPStatementSource.class);
    @DataFlowOpParameter
    private String statementName;
    @DataFlowOpParameter
    private EPDataFlowEPStatementFilter statementFilter;
    @DataFlowOpParameter
    private EPDataFlowIRStreamCollector collector;
    @DataFlowContext
    private EPDataFlowEmitter graphContext;
    private StatementLifecycleSvc statementLifecycleSvc;
    private Map<EPStatement, StatementAwareUpdateListener> listeners = new HashMap<EPStatement, StatementAwareUpdateListener>();
    private LinkedBlockingQueue<Object> emittables = new LinkedBlockingQueue();
    private boolean submitEventBean;
    private ThreadLocal<EPDataFlowIRStreamCollectorContext> collectorDataTL = new ThreadLocal<EPDataFlowIRStreamCollectorContext>(){

        @Override
        protected synchronized EPDataFlowIRStreamCollectorContext initialValue() {
            return null;
        }
    };

    @Override
    public DataFlowOpInitializeResult initialize(DataFlowOpInitializateContext context) throws Exception {
        if (context.getOutputPorts().size() != 1) {
            throw new IllegalArgumentException("EPStatementSource operator requires one output stream but produces " + context.getOutputPorts().size() + " streams");
        }
        if (this.statementName == null && this.statementFilter == null) {
            throw new EPException("Failed to find required 'statementName' or 'statementFilter' parameter");
        }
        if (this.statementName != null && this.statementFilter != null) {
            throw new EPException("Both 'statementName' or 'statementFilter' parameters were provided, only either one is expected");
        }
        DataFlowOpOutputPort portZero = context.getOutputPorts().get(0);
        if (portZero != null && portZero.getOptionalDeclaredType() != null && portZero.getOptionalDeclaredType().isWildcard()) {
            this.submitEventBean = true;
        }
        this.statementLifecycleSvc = context.getServicesContext().getStatementLifecycleSvc();
        return null;
    }

    @Override
    public void next() throws InterruptedException {
        Object next = this.emittables.take();
        if (next instanceof EPDataFlowSignal) {
            EPDataFlowSignal signal = (EPDataFlowSignal)next;
            this.graphContext.submitSignal(signal);
        } else if (next instanceof PortAndMessagePair) {
            PortAndMessagePair pair = (PortAndMessagePair)next;
            this.graphContext.submitPort(pair.getPort(), pair.getMessage());
        } else {
            this.graphContext.submit(next);
        }
    }

    @Override
    public synchronized void open(DataFlowOpOpenContext openContext) {
        this.statementLifecycleSvc.addObserver(this);
        if (this.statementName != null) {
            EPStatement stmt = this.statementLifecycleSvc.getStatementByName(this.statementName);
            if (stmt != null) {
                this.addStatement(stmt);
            }
        } else {
            String[] statements;
            for (String name : statements = this.statementLifecycleSvc.getStatementNames()) {
                EPStatement stmt = this.statementLifecycleSvc.getStatementByName(name);
                if (!this.statementFilter.pass(stmt)) continue;
                this.addStatement(stmt);
            }
        }
    }

    @Override
    public synchronized void observe(StatementLifecycleEvent theEvent) {
        EPStatement stmt = theEvent.getStatement();
        if (theEvent.getEventType() == StatementLifecycleEvent.LifecycleEventType.STATECHANGE) {
            StatementAwareUpdateListener listener;
            if ((theEvent.getStatement().isStopped() || theEvent.getStatement().isDestroyed()) && (listener = this.listeners.remove(stmt)) != null) {
                stmt.removeListener(listener);
            }
            if (theEvent.getStatement().isStarted()) {
                if (this.statementFilter == null) {
                    if (theEvent.getStatement().getName().equals(this.statementName)) {
                        this.addStatement(stmt);
                    }
                } else if (this.statementFilter.pass(stmt)) {
                    this.addStatement(stmt);
                }
            }
        }
    }

    @Override
    public void close(DataFlowOpCloseContext openContext) {
        for (Map.Entry<EPStatement, StatementAwareUpdateListener> entry : this.listeners.entrySet()) {
            try {
                entry.getKey().removeListener(entry.getValue());
            }
            catch (Exception ex) {
                log.debug("Exception encountered removing listener: " + ex.getMessage(), (Throwable)ex);
            }
        }
        this.listeners.clear();
    }

    private void addStatement(EPStatement stmt) {
        StatementAwareUpdateListener listener;
        if (this.listeners.containsKey(stmt)) {
            return;
        }
        if (this.collector == null) {
            listener = new EmitterUpdateListener(this.emittables, this.submitEventBean);
        } else {
            LocalEmitter emitterForCollector = new LocalEmitter(this.emittables);
            listener = new EmitterCollectorUpdateListener(this.collector, emitterForCollector, this.collectorDataTL, this.submitEventBean);
        }
        stmt.addListener(listener);
        this.listeners.put(stmt, listener);
    }

    public static class PortAndMessagePair {
        private final int port;
        private final Object message;

        public PortAndMessagePair(int port, Object message) {
            this.port = port;
            this.message = message;
        }

        public int getPort() {
            return this.port;
        }

        public Object getMessage() {
            return this.message;
        }
    }

    public static class LocalEmitter
    implements EPDataFlowEmitter {
        private final LinkedBlockingQueue<Object> queue;

        public LocalEmitter(LinkedBlockingQueue<Object> queue) {
            this.queue = queue;
        }

        @Override
        public void submit(Object object) {
            this.queue.add(object);
        }

        @Override
        public void submitSignal(EPDataFlowSignal signal) {
            this.queue.add(signal);
        }

        @Override
        public void submitPort(int portNumber, Object object) {
            this.queue.add(object);
        }
    }

    public static class EmitterCollectorUpdateListener
    implements StatementAwareUpdateListener {
        private final EPDataFlowIRStreamCollector collector;
        private final LocalEmitter emitterForCollector;
        private final ThreadLocal<EPDataFlowIRStreamCollectorContext> collectorDataTL;
        private final boolean submitEventBean;

        public EmitterCollectorUpdateListener(EPDataFlowIRStreamCollector collector, LocalEmitter emitterForCollector, ThreadLocal<EPDataFlowIRStreamCollectorContext> collectorDataTL, boolean submitEventBean) {
            this.collector = collector;
            this.emitterForCollector = emitterForCollector;
            this.collectorDataTL = collectorDataTL;
            this.submitEventBean = submitEventBean;
        }

        @Override
        public void update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider) {
            EPDataFlowIRStreamCollectorContext holder = this.collectorDataTL.get();
            if (holder == null) {
                holder = new EPDataFlowIRStreamCollectorContext(this.emitterForCollector, this.submitEventBean, newEvents, oldEvents, statement, epServiceProvider);
                this.collectorDataTL.set(holder);
            } else {
                holder.setEpServiceProvider(epServiceProvider);
                holder.setStatement(statement);
                holder.setOldEvents(oldEvents);
                holder.setNewEvents(newEvents);
            }
            this.collector.collect(holder);
        }
    }

    public static class EmitterUpdateListener
    implements StatementAwareUpdateListener {
        private final Queue<Object> queue;
        private final boolean submitEventBean;

        public EmitterUpdateListener(Queue<Object> queue, boolean submitEventBean) {
            this.queue = queue;
            this.submitEventBean = submitEventBean;
        }

        @Override
        public void update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider) {
            if (newEvents != null) {
                for (EventBean newEvent : newEvents) {
                    if (this.submitEventBean) {
                        this.queue.add(newEvent);
                        continue;
                    }
                    Object underlying = newEvent.getUnderlying();
                    this.queue.add(underlying);
                }
            }
        }
    }
}

