/*
 * Decompiled with CFR 0.152.
 */
package streams.esper;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventType;
import com.espertech.esper.client.soda.EPStatementObjectModel;
import com.espertech.esper.client.soda.Stream;
import com.espertech.esper.client.time.CurrentTimeEvent;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import stream.Configurable;
import stream.Data;
import stream.annotations.Parameter;
import stream.io.Sink;
import stream.service.Service;
import streams.esper.EsperData;
import streams.esper.EsperStatementBean;
import streams.esper.EsperStatementSubscriber;
import streams.esper.EsperStreamEventTypeVisitor;
import streams.esper.EsperTrimmedStatementSubscriber;

public class EsperEngine
implements Service,
Configurable {
    private static final Logger _log = LoggerFactory.getLogger(EsperEngine.class);
    public static final String ESPER_CONFIG_LOCAL_NAME = "esper-configuration";
    public static final String ESPER_NS = "http://www.espertech.com/schema/esper";
    public static final String EVENT_TYPE_KEY = "EPEventType";
    public static final String ESPER_STATEMENT_LOCAL_NAME = "statement";
    public static final String DEFAULT_ID = "default";
    public static final long DEFAULT_TIME_TOLERANCE = 1000L;
    private static final Map<String, EsperEngine> _registry = new HashMap<String, EsperEngine>();
    protected transient EPServiceProvider _epService;
    protected transient EPRuntime _epRuntime;
    private long _currentTime;
    private long _initialTime;
    private final Configuration _configuration;
    private long _itemsCounter;
    private long _timeTolerance = 1000L;
    private String _id;
    private final Map<String, Class<?>> _typesMap = new LinkedHashMap();
    private final Map<String, String> _startTimestampMap;
    private final Map<String, String> _endTimestampMap;
    private String _epProviderURI;
    private final List<EsperStatementBean> _staticStatements;
    private Integer _shutdownCount;

    private static void registerEngine(EsperEngine engine) {
        if (_registry.containsKey(engine.getId())) {
            String errorMessage = String.format("Engine '%s' already registered!", engine.getId());
            throw new IllegalArgumentException(errorMessage);
        }
        _log.info("Registering engine {}", (Object)engine.getId());
        _registry.put(engine.getId(), engine);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static EsperEngine getEsperEngine(String id) {
        EsperEngine result = _registry.get(id);
        if (result == null && DEFAULT_ID.equals(id)) {
            Map<String, EsperEngine> map = _registry;
            synchronized (map) {
                _log.info("Requesting default engine, but default engine was not yet registered. Creating it...");
                result = new EsperEngine();
                result.setId(DEFAULT_ID);
                try {
                    result.init();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
        return result;
    }

    protected static Class<?> classForName(String name) {
        String[] pkgs;
        for (String pkg : pkgs = new String[]{"", "java.lang"}) {
            String className = name;
            if (!pkg.isEmpty()) {
                className = pkg + "." + name;
            }
            try {
                Class<?> clazz = Class.forName(className);
                if (clazz == null) continue;
                return clazz;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        return null;
    }

    public EsperEngine() {
        this._configuration = new Configuration();
        this._startTimestampMap = new HashMap<String, String>();
        this._endTimestampMap = new HashMap<String, String>();
        this._staticStatements = new ArrayList<EsperStatementBean>();
    }

    public void init() throws Exception {
        _log.info("Started initializing {} ...", this.getClass());
        this._shutdownCount = 0;
        this._itemsCounter = 0L;
        this._currentTime = Long.MIN_VALUE;
        String providerUri = this.getProviderUri();
        if (providerUri == null || providerUri.isEmpty()) {
            _log.debug("Creating new Esper service from default provider.");
            this._epService = EPServiceProviderManager.getDefaultProvider((Configuration)this._configuration);
        } else {
            _log.debug("Creating new Esper service from named provider: {}", (Object)providerUri);
            this._epService = EPServiceProviderManager.getProvider((String)providerUri, (Configuration)this._configuration);
        }
        _log.info("Declaring types to Esper service");
        for (Map.Entry<String, Class<?>> typesEntry : this._typesMap.entrySet()) {
            this._configuration.addEventType(typesEntry.getKey(), typesEntry.getValue());
        }
        _log.info("Finished declaring types to Esper service");
        _log.info("Adding static queries");
        for (EsperStatementBean statement : this._staticStatements) {
            this.addEsperQuery(statement);
        }
        _log.info("Finished adding static queries");
        _log.debug("Creating Esper runtime...");
        this._epRuntime = this._epService.getEPRuntime();
        _log.debug("Finished creating Esper runtime...");
        _log.info("Finished initalizing {}.", this.getClass());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyShutdown() throws Exception {
        Integer n = this._shutdownCount;
        synchronized (n) {
            this._shutdownCount = this._shutdownCount - 1;
        }
        if (this._shutdownCount <= 0) {
            _log.info("Started destroying Esper engine...");
            try {
                this._epService.destroy();
            }
            catch (Exception e) {
                _log.error("Error destroying Esper engine!");
                throw e;
            }
            _log.info("Finished destroying Esper engine.");
        }
    }

    private boolean checkTimestamps(Data input, Data event, String mapEventTypeName) {
        String endTimeKey;
        String startTimeKey = this._startTimestampMap.get(mapEventTypeName);
        if (startTimeKey != null) {
            long dataStartTime = ((Number)input.get((Object)startTimeKey)).longValue();
            event.put((Object)startTimeKey, (Object)dataStartTime);
            if (dataStartTime > this._currentTime) {
                CurrentTimeEvent timeEvent = null;
                if (this._currentTime != Long.MIN_VALUE) {
                    this._currentTime = dataStartTime;
                    timeEvent = new CurrentTimeEvent(this._currentTime - this._timeTolerance);
                    _log.debug("Sending time event new time: {}", (Object)this._currentTime);
                    _log.debug("Data items per time interval: {}", (Object)this._itemsCounter);
                    this._itemsCounter = 0L;
                } else {
                    this._currentTime = dataStartTime;
                    timeEvent = new CurrentTimeEvent(this._currentTime - this._timeTolerance);
                    _log.debug("Setting start time: {}", (Object)this._currentTime);
                    _log.debug("Data items per time interval: {}", (Object)this._itemsCounter);
                    this._itemsCounter = 0L;
                }
                this._epRuntime.sendEvent((Object)timeEvent);
            } else if (dataStartTime < this._currentTime - this._timeTolerance && _log.isDebugEnabled()) {
                _log.debug("Time inconsistency! {} Tolerance: {}", (Object)(this._currentTime - dataStartTime), (Object)this._timeTolerance);
                return false;
            }
        }
        if ((endTimeKey = this._endTimestampMap.get(mapEventTypeName)) != null) {
            long dataEndTime = Long.parseLong(((Serializable)input.get((Object)endTimeKey)).toString());
            event.put((Object)endTimeKey, (Object)dataEndTime);
        }
        return true;
    }

    public boolean write(Data input) throws Exception {
        EsperData event = new EsperData(input.createCopy());
        event.remove("@stream");
        event.remove("@stream:id");
        Object mapEventTypeName = input.get((Object)"@esperType");
        if (mapEventTypeName != null) {
            if (this.checkTimestamps(input, event, (String)mapEventTypeName)) {
                this._epRuntime.getEventSender((String)mapEventTypeName).sendEvent((Object)event);
            }
        } else {
            this._epRuntime.sendEvent((Object)event);
        }
        ++this._itemsCounter;
        return true;
    }

    public void reset() throws Exception {
        _log.info("Started resetting Esper engine '{}' ...", (Object)this.getProviderUri());
        if (this._epService != null) {
            this._epService.destroy();
        }
        this.init();
        _log.info("Finished resetting Esper engine '{}' .", (Object)this.getProviderUri());
    }

    public void configure(Element document) {
        NodeList esperConfigNodeList = document.getElementsByTagName(ESPER_CONFIG_LOCAL_NAME);
        for (int i = 0; i < esperConfigNodeList.getLength(); ++i) {
            _log.debug("Configuring Esper with xml node.");
            Node esperConfigNode = esperConfigNodeList.item(i);
            _log.debug("Esper xml configuration: {}", (Object)esperConfigNode.cloneNode(true));
            try {
                Document esperConfigDocument = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
                esperConfigDocument.appendChild(esperConfigDocument.importNode(esperConfigNode, true));
                this._configuration.configure(esperConfigDocument);
            }
            catch (ParserConfigurationException e) {
                _log.debug("Error configuring Esper with xml node.");
                throw new RuntimeException(e);
            }
            _log.debug("Finished configuring Esper with xml node.");
        }
        NodeList esperStatementNodeList = document.getElementsByTagName(ESPER_STATEMENT_LOCAL_NAME);
        for (int i = 0; i < esperStatementNodeList.getLength(); ++i) {
            Element esperStatementNode = (Element)esperStatementNodeList.item(i);
            try {
                EsperStatementBean epStatement = new EsperStatementBean();
                if (esperStatementNode.hasAttribute("name")) {
                    epStatement.setName(esperStatementNode.getAttribute("name"));
                }
                if (esperStatementNode.hasAttribute("removeBackticks")) {
                    epStatement.setRemoveBackticks(Boolean.parseBoolean(esperStatementNode.getAttribute("removeBackticks")));
                }
                epStatement.setStatement(esperStatementNode.getTextContent().trim());
                this.addStaticEsperStatement(epStatement);
                continue;
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading Esper statement from configuration!", e);
            }
        }
    }

    private void addStaticEsperStatement(EsperStatementBean epStatement) {
        this._staticStatements.add(epStatement);
    }

    public void addEsperQuery(EsperStatementBean epStatement, boolean increaseShutdownCount) {
        this._shutdownCount = this._shutdownCount + 1;
        this.addEsperQuery(epStatement);
    }

    public void addEsperQuery(EsperStatementBean epStatement) {
        EPStatement stmt;
        _log.info("Compiling statement {}", (Object)epStatement);
        _log.debug("Compiling Esper statement {}", (Object)epStatement.getStatement());
        EPAdministrator epAdmin = this._epService.getEPAdministrator();
        EPStatementObjectModel stmtModel = epAdmin.compileEPL(epStatement.getStatement());
        EsperStreamEventTypeVisitor esperStreamVisitor = new EsperStreamEventTypeVisitor(this._epService.getEPAdministrator().getConfiguration());
        for (Stream s : stmtModel.getFromClause().getStreams()) {
            esperStreamVisitor.visitStream(s);
        }
        String stmtName = epStatement.getName();
        EPStatement ePStatement = stmt = stmtName == null ? epAdmin.create(stmtModel) : epAdmin.create(stmtModel, stmtName);
        if (epStatement.getOutput() != null) {
            Sink[] sinksList = epStatement.getOutput();
            if (sinksList == null) {
                _log.warn("Statement {} has no sinks ");
            } else {
                String[] propertyNames = stmt.getEventType().getPropertyNames();
                EsperStatementSubscriber subscriber = epStatement.isRemoveBackticks() ? new EsperTrimmedStatementSubscriber(Arrays.asList(sinksList), propertyNames) : new EsperStatementSubscriber(Arrays.asList(sinksList), propertyNames);
                _log.info("Adding subscriber {} to statement {}", (Object)sinksList, (Object)epStatement);
                stmt.setSubscriber((Object)subscriber);
            }
        }
        this.mapTimestampProperties();
        _log.info("Finished compiling statement {}", (Object)epStatement);
    }

    private void mapTimestampProperties() {
        _log.debug("Mapping event types to timestamp properties, if any");
        EPAdministrator epAdmin = this._epService.getEPAdministrator();
        for (EventType eventType : epAdmin.getConfiguration().getEventTypes()) {
            String startTimestampProperty = eventType.getStartTimestampPropertyName();
            String endTimestampProperty = eventType.getEndTimestampPropertyName();
            String timestampProperty = endTimestampProperty == null ? startTimestampProperty : endTimestampProperty;
            String eventTypeName = eventType.getName();
            _log.debug("Timestamp property for event type {}: {}", (Object)eventTypeName, (Object)timestampProperty);
            this._startTimestampMap.put(eventTypeName, startTimestampProperty);
            this._endTimestampMap.put(eventTypeName, endTimestampProperty);
        }
        _log.debug("Finished mapping event types to timestamp properties, if any");
    }

    public Configuration getConfiguration() {
        return this._configuration;
    }

    @Parameter(defaultValue="", description="The URI of the Esper Runtime.", required=false)
    public String getProviderUri() {
        return this._epProviderURI;
    }

    public void setProviderUri(String providerUri) {
        this._epProviderURI = providerUri;
    }

    public long getInitialTime() {
        return this._initialTime;
    }

    @Parameter(required=false)
    public void setInitialTime(long initialTime) {
        this._initialTime = initialTime;
        this._currentTime = initialTime;
    }

    public long getTimeTolerance() {
        return this._timeTolerance;
    }

    @Parameter(name="timeTolerance", defaultValue="100000", description="The tolerance how many msecs a time event may reach into the past to be still processed.", required=false)
    public void setTimeTolerance(long timeTolerance) {
        this._timeTolerance = timeTolerance;
    }

    public void setId(String id) {
        if (this._id != null) {
            String errorMessage = String.format("Parameter %s already defined.", "id");
            throw new IllegalArgumentException(errorMessage);
        }
        this._id = id;
        EsperEngine.registerEngine(this);
    }

    public String getId() {
        return this._id;
    }

    public String[] getTypes() {
        ArrayList<String> result = new ArrayList<String>();
        for (String key : this._typesMap.keySet()) {
            Class<?> clazz = this._typesMap.get(key);
            result.add(String.format("%s:%s", key, clazz));
        }
        return result.toArray(new String[result.size()]);
    }

    @Parameter(required=false, description="Simple key:value mapping of properties")
    public void setTypes(String[] types) {
        this._typesMap.clear();
        for (String typeDefinition : types) {
            Class<?> clazz;
            String key;
            int idx = typeDefinition.indexOf(":");
            if (idx > 0) {
                key = typeDefinition.substring(0, idx);
                String type = typeDefinition.substring(idx + 1);
                clazz = EsperEngine.classForName(type);
                if (clazz == null) {
                    String errorMessage = String.format("Failed to locate class for type '%s'!", type);
                    throw new IllegalArgumentException(errorMessage);
                }
            } else {
                String errorMessage = String.format("Type definition contains no colon!", new Object[0]);
                throw new IllegalArgumentException(errorMessage);
            }
            _log.debug("Defining type class '{}' for key '{}'", (Object)key, clazz);
            this._typesMap.put(key, clazz);
        }
        _log.debug("Types: {}", (Object[])types);
    }
}

