/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esper.dataflow.ops;

import com.espertech.esper.client.EPException;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.EventType;
import com.espertech.esper.client.dataflow.EPDataFlowEventBeanCollector;
import com.espertech.esper.client.dataflow.EPDataFlowEventBeanCollectorContext;
import com.espertech.esper.core.context.util.AgentInstanceContext;
import com.espertech.esper.core.context.util.EPStatementAgentInstanceHandle;
import com.espertech.esper.core.service.EPStatementHandleCallback;
import com.espertech.esper.core.service.StatementAgentInstanceFilterVersion;
import com.espertech.esper.dataflow.annotations.DataFlowContext;
import com.espertech.esper.dataflow.annotations.DataFlowOpParameter;
import com.espertech.esper.dataflow.annotations.DataFlowOperator;
import com.espertech.esper.dataflow.interfaces.DataFlowOpCloseContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpInitializateContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpInitializeResult;
import com.espertech.esper.dataflow.interfaces.DataFlowOpLifecycle;
import com.espertech.esper.dataflow.interfaces.DataFlowOpOpenContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpOutputPort;
import com.espertech.esper.dataflow.interfaces.DataFlowSourceOperator;
import com.espertech.esper.dataflow.interfaces.EPDataFlowEmitter;
import com.espertech.esper.epl.core.StreamTypeServiceImpl;
import com.espertech.esper.epl.expression.core.ExprNode;
import com.espertech.esper.epl.expression.core.ExprValidationException;
import com.espertech.esper.filter.FilterHandleCallback;
import com.espertech.esper.filter.FilterServiceEntry;
import com.espertech.esper.filter.FilterSpecCompiled;
import com.espertech.esper.filter.FilterSpecCompiler;
import com.espertech.esper.filter.FilterValueSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;

@DataFlowOperator
public class EventBusSource
implements DataFlowSourceOperator,
DataFlowOpLifecycle,
FilterHandleCallback {
    @DataFlowOpParameter
    private ExprNode filter;
    @DataFlowOpParameter
    private EPDataFlowEventBeanCollector collector;
    @DataFlowContext
    private EPDataFlowEmitter graphContext;
    private EventType eventType;
    private AgentInstanceContext agentInstanceContext;
    private EPStatementHandleCallback callbackHandle;
    private FilterServiceEntry filterServiceEntry;
    private LinkedBlockingDeque<Object> emittables = new LinkedBlockingDeque();
    private boolean submitEventBean;
    private ThreadLocal<EPDataFlowEventBeanCollectorContext> collectorDataTL = new ThreadLocal<EPDataFlowEventBeanCollectorContext>(){

        @Override
        protected synchronized EPDataFlowEventBeanCollectorContext initialValue() {
            return null;
        }
    };

    @Override
    public DataFlowOpInitializeResult initialize(DataFlowOpInitializateContext context) throws Exception {
        if (context.getOutputPorts().size() != 1) {
            throw new IllegalArgumentException("EventBusSource operator requires one output stream but produces " + context.getOutputPorts().size() + " streams");
        }
        DataFlowOpOutputPort portZero = context.getOutputPorts().get(0);
        if (portZero.getOptionalDeclaredType() == null || portZero.getOptionalDeclaredType().getEventType() == null) {
            throw new IllegalArgumentException("EventBusSource operator requires an event type declated for the output stream");
        }
        if (!portZero.getOptionalDeclaredType().isUnderlying()) {
            this.submitEventBean = true;
        }
        this.eventType = portZero.getOptionalDeclaredType().getEventType();
        this.agentInstanceContext = context.getAgentInstanceContext();
        return new DataFlowOpInitializeResult();
    }

    @Override
    public void next() throws InterruptedException {
        Object next = this.emittables.take();
        this.graphContext.submit(next);
    }

    @Override
    public void matchFound(EventBean theEvent, Collection<FilterHandleCallback> allStmtMatches) {
        if (this.collector != null) {
            EPDataFlowEventBeanCollectorContext holder = this.collectorDataTL.get();
            if (holder == null) {
                holder = new EPDataFlowEventBeanCollectorContext(this.graphContext, this.submitEventBean, theEvent);
                this.collectorDataTL.set(holder);
            } else {
                holder.setEvent(theEvent);
            }
            this.collector.collect(holder);
        } else if (this.submitEventBean) {
            this.emittables.add(theEvent);
        } else {
            this.emittables.add(theEvent.getUnderlying());
        }
    }

    @Override
    public boolean isSubSelect() {
        return false;
    }

    @Override
    public String getStatementId() {
        return this.agentInstanceContext.getStatementId();
    }

    @Override
    public void open(DataFlowOpOpenContext openContext) {
        FilterValueSet valueSet;
        try {
            List<ExprNode> filters = Collections.emptyList();
            if (this.filter != null) {
                filters = Collections.singletonList(this.filter);
            }
            FilterSpecCompiled spec = FilterSpecCompiler.makeFilterSpec(this.eventType, this.eventType.getName(), filters, null, null, null, new StreamTypeServiceImpl(this.eventType, this.eventType.getName(), true, this.agentInstanceContext.getEngineURI()), null, this.agentInstanceContext.getStatementContext(), new ArrayList<Integer>());
            valueSet = spec.getValueSet(null, this.agentInstanceContext, null);
        }
        catch (ExprValidationException ex) {
            throw new EPException("Failed to open filter: " + ex.getMessage(), ex);
        }
        EPStatementAgentInstanceHandle handle = new EPStatementAgentInstanceHandle(this.agentInstanceContext.getStatementContext().getEpStatementHandle(), this.agentInstanceContext.getAgentInstanceLock(), 0, new StatementAgentInstanceFilterVersion());
        this.callbackHandle = new EPStatementHandleCallback(handle, this);
        this.filterServiceEntry = this.agentInstanceContext.getStatementContext().getFilterService().add(valueSet, this.callbackHandle);
    }

    @Override
    public synchronized void close(DataFlowOpCloseContext openContext) {
        if (this.callbackHandle != null) {
            this.agentInstanceContext.getStatementContext().getFilterService().remove(this.callbackHandle, this.filterServiceEntry);
            this.callbackHandle = null;
            this.filterServiceEntry = null;
        }
    }
}

