/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esperio.kafka;

import com.espertech.esper.client.ConfigurationException;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.core.service.EPServiceProviderSPI;
import com.espertech.esper.util.JavaClassHelper;
import com.espertech.esperio.kafka.EsperIOKafkaInputProcessor;
import com.espertech.esperio.kafka.EsperIOKafkaInputProcessorContext;
import com.espertech.esperio.kafka.EsperIOKafkaInputRunnable;
import com.espertech.esperio.kafka.EsperIOKafkaInputSubscriber;
import com.espertech.esperio.kafka.EsperIOKafkaInputSubscriberContext;
import com.espertech.esperio.kafka.EsperIOKafkaInputThreadFactory;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * Exception performing whole class analysis ignored.
 */
public class EsperIOKafkaInputAdapter {
    private static final Logger log = LoggerFactory.getLogger(EsperIOKafkaInputAdapter.class);
    private final Properties properties;
    private final String engineURI;
    private KafkaConsumer consumer;
    private ExecutorService executorService;
    private EsperIOKafkaInputRunnable runnable;
    private EsperIOKafkaInputProcessor processor;

    public EsperIOKafkaInputAdapter(Properties properties, String engineURI) {
        this.properties = properties;
        this.engineURI = engineURI;
    }

    public void start() {
        if (log.isInfoEnabled()) {
            log.info("Starting EsperIO Kafka Input Adapter for engine URI '{}'", (Object)this.engineURI);
        }
        Properties consumerProperties = new Properties();
        for (String propertyName : this.properties.stringPropertyNames()) {
            if (propertyName.startsWith("esperio")) continue;
            consumerProperties.put(propertyName, this.properties.getProperty(propertyName));
        }
        try {
            this.consumer = new KafkaConsumer(consumerProperties);
        }
        catch (Throwable t) {
            log.error("Error obtaining Kafka consumer for URI '{}': {}", new Object[]{this.engineURI, t.getMessage(), t});
        }
        EPServiceProviderSPI engine = (EPServiceProviderSPI)EPServiceProviderManager.getProvider(this.engineURI);
        String subscriberClassName = EsperIOKafkaInputAdapter.getRequiredProperty((Properties)this.properties, (String)"esperio.kafka.input.subscriber");
        try {
            EsperIOKafkaInputSubscriber subscriber = (EsperIOKafkaInputSubscriber)JavaClassHelper.instantiate(EsperIOKafkaInputSubscriber.class, subscriberClassName, engine.getEngineImportService().getClassForNameProvider());
            EsperIOKafkaInputSubscriberContext subscriberContext = new EsperIOKafkaInputSubscriberContext(this.consumer, (EPServiceProvider)engine, this.properties);
            subscriber.subscribe(subscriberContext);
        }
        catch (Throwable t) {
            throw new ConfigurationException("Unexpected exception invoking subscriber subscribe method on class " + subscriberClassName + " for engine URI '" + this.engineURI + "': " + t.getMessage(), t);
        }
        String processorClassName = EsperIOKafkaInputAdapter.getRequiredProperty((Properties)this.properties, (String)"esperio.kafka.input.processor");
        try {
            this.processor = (EsperIOKafkaInputProcessor)JavaClassHelper.instantiate(EsperIOKafkaInputProcessor.class, processorClassName, engine.getEngineImportService().getClassForNameProvider());
            EsperIOKafkaInputProcessorContext processorContext = new EsperIOKafkaInputProcessorContext(this.consumer, engine, this.properties, this);
            this.processor.init(processorContext);
        }
        catch (Throwable t) {
            throw new ConfigurationException("Unexpected exception invoking processor init method on class " + processorClassName + " for engine URI '" + this.engineURI + "': " + t.getMessage(), t);
        }
        this.executorService = Executors.newFixedThreadPool(1, (ThreadFactory)new EsperIOKafkaInputThreadFactory(this.engineURI));
        this.runnable = new EsperIOKafkaInputRunnable(this.consumer, this.processor);
        this.executorService.submit((Runnable)this.runnable);
        if (log.isInfoEnabled()) {
            log.info("Completed starting EsperIO Kafka Input Adapter for engine URI '{}'", (Object)this.engineURI);
        }
    }

    public void destroy() {
        if (log.isDebugEnabled()) {
            log.debug("Destroying Esper Kafka Input Adapter for engine URI '{}'", (Object)this.engineURI);
        }
        this.runnable.setShutdown(true);
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.processor.close();
        this.consumer.close();
    }

    protected static String getRequiredProperty(Properties properties, String config) {
        String value = properties.getProperty(config);
        if (value == null) {
            throw new ConfigurationException("Property '" + config + "' not provided");
        }
        return value;
    }
}

