/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esperio.kafka;

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EPStatementStateListener;
import com.espertech.esper.client.UpdateListener;
import com.espertech.esper.epl.annotation.AnnotationUtil;
import com.espertech.esperio.kafka.EsperIOKafkaInputAdapter;
import com.espertech.esperio.kafka.EsperIOKafkaOutputFlowController;
import com.espertech.esperio.kafka.EsperIOKafkaOutputFlowControllerByAnnotatedStmt;
import com.espertech.esperio.kafka.EsperIOKafkaOutputFlowControllerContext;
import com.espertech.esperio.kafka.KafkaOutputDefault;
import java.lang.annotation.Annotation;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EsperIOKafkaOutputFlowControllerByAnnotatedStmt
implements EsperIOKafkaOutputFlowController {
    private static final Logger log = LoggerFactory.getLogger(EsperIOKafkaOutputFlowControllerByAnnotatedStmt.class);
    private KafkaProducer producer;
    private EPServiceProvider engine;
    private Set<String> topics = new LinkedHashSet();

    public void initialize(EsperIOKafkaOutputFlowControllerContext context) {
        String[] statementNames;
        String[] topicNames;
        this.engine = context.getEngine();
        try {
            this.producer = new KafkaProducer(context.getProperties());
        }
        catch (Throwable t) {
            log.error("Error obtaining Kafka producer for URI '{}': {}", new Object[]{context.getEngine().getURI(), t.getMessage(), t});
        }
        String topicsCSV = EsperIOKafkaInputAdapter.getRequiredProperty((Properties)context.getProperties(), (String)"esperio.kafka.topics");
        for (String topicName : topicNames = topicsCSV.split(",")) {
            if (topicName.trim().length() <= 0) continue;
            this.topics.add(topicName.trim());
        }
        for (String statementName : statementNames = context.getEngine().getEPAdministrator().getStatementNames()) {
            this.processStatement(this.engine.getEPAdministrator().getStatement(statementName));
        }
        this.engine.addStatementStateListener((EPStatementStateListener)new /* Unavailable Anonymous Inner Class!! */);
    }

    private void processStatement(EPStatement statement) {
        if (statement == null) {
            return;
        }
        Annotation annotation = AnnotationUtil.findAnnotation(statement.getAnnotations(), KafkaOutputDefault.class);
        if (annotation == null) {
            return;
        }
        KafkaOutputDefaultListener listener = new KafkaOutputDefaultListener(this.engine, statement, this.producer, this.topics);
        statement.addListener((UpdateListener)listener);
        log.info("Added Kafka-Output-Adapter listener to statement '{}' topics {}", (Object)statement.getName(), (Object)this.topics.toString());
    }

    private void detachStatement(EPStatement statement) {
        Iterator<UpdateListener> listeners = statement.getUpdateListeners();
        UpdateListener found = null;
        while (listeners.hasNext()) {
            UpdateListener listener = listeners.next();
            if (!(listener instanceof KafkaOutputDefaultListener)) continue;
            found = listener;
            break;
        }
        if (found != null) {
            statement.removeListener(found);
        }
        log.info("Removed Kafka-Output-Adapter listener from statement '{}'", (Object)statement.getName());
    }

    public void close() {
        this.producer.close();
    }

    static /* synthetic */ void access$000(EsperIOKafkaOutputFlowControllerByAnnotatedStmt x0, EPStatement x1) {
        x0.processStatement(x1);
    }

    static /* synthetic */ void access$100(EsperIOKafkaOutputFlowControllerByAnnotatedStmt x0, EPStatement x1) {
        x0.detachStatement(x1);
    }
}

