/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.inputs.transports;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Named;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.ThrottleableTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTransport
extends ThrottleableTransport {
    public static final String GROUP_ID = "graylog2";
    public static final String CK_FETCH_MIN_BYTES = "fetch_min_bytes";
    public static final String CK_FETCH_WAIT_MAX = "fetch_wait_max";
    public static final String CK_ZOOKEEPER = "zookeeper";
    public static final String CK_TOPIC_FILTER = "topic_filter";
    public static final String CK_THREADS = "threads";
    public static final String CK_OFFSET_RESET = "offset_reset";
    private static final ImmutableMap<String, String> OFFSET_RESET_VALUES = ImmutableMap.of((Object)"largest", (Object)"Automatically reset the offset to the largest offset", (Object)"smallest", (Object)"Automatically reset the offset to the smallest offset");
    private static final String DEFAULT_OFFSET_RESET = "largest";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTransport.class);
    private final Configuration configuration;
    private final MetricRegistry localRegistry;
    private final NodeId nodeId;
    private final EventBus serverEventBus;
    private final ServerStatus serverStatus;
    private final ScheduledExecutorService scheduler;
    private final MetricRegistry metricRegistry;
    private final AtomicLong totalBytesRead = new AtomicLong(0L);
    private final AtomicLong lastSecBytesRead = new AtomicLong(0L);
    private final AtomicLong lastSecBytesReadTmp = new AtomicLong(0L);
    private volatile boolean stopped = false;
    private volatile boolean paused = true;
    private volatile CountDownLatch pausedLatch = new CountDownLatch(1);
    private CountDownLatch stopLatch;
    private ConsumerConnector cc;

    @AssistedInject
    public KafkaTransport(@Assisted Configuration configuration, LocalMetricRegistry localRegistry, NodeId nodeId, EventBus serverEventBus, ServerStatus serverStatus, @Named(value="daemonScheduler") ScheduledExecutorService scheduler) {
        super(serverEventBus, configuration);
        this.configuration = configuration;
        this.localRegistry = localRegistry;
        this.nodeId = nodeId;
        this.serverEventBus = serverEventBus;
        this.serverStatus = serverStatus;
        this.scheduler = scheduler;
        this.metricRegistry = localRegistry;
        localRegistry.register("read_bytes_1sec", (Metric)new Gauge<Long>(){

            public Long getValue() {
                return KafkaTransport.this.lastSecBytesRead.get();
            }
        });
        localRegistry.register("written_bytes_1sec", (Metric)new Gauge<Long>(){

            public Long getValue() {
                return 0L;
            }
        });
        localRegistry.register("read_bytes_total", (Metric)new Gauge<Long>(){

            public Long getValue() {
                return KafkaTransport.this.totalBytesRead.get();
            }
        });
        localRegistry.register("written_bytes_total", (Metric)new Gauge<Long>(){

            public Long getValue() {
                return 0L;
            }
        });
    }

    @Subscribe
    public void lifecycleStateChange(Lifecycle lifecycle) {
        LOG.debug("Lifecycle changed to {}", (Object)lifecycle);
        switch (lifecycle) {
            case PAUSED: 
            case FAILED: 
            case HALTING: {
                this.pausedLatch = new CountDownLatch(1);
                this.paused = true;
                break;
            }
            default: {
                this.paused = false;
                this.pausedLatch.countDown();
            }
        }
    }

    @Override
    public void setMessageAggregator(CodecAggregator ignored) {
    }

    @Override
    public void doLaunch(final MessageInput input) throws MisfireException {
        this.serverStatus.awaitRunning(new Runnable(){

            @Override
            public void run() {
                KafkaTransport.this.lifecycleStateChange(Lifecycle.RUNNING);
            }
        });
        this.serverEventBus.register((Object)this);
        Properties props = new Properties();
        props.put("group.id", GROUP_ID);
        props.put("client.id", "gl2-" + this.nodeId + "-" + input.getId());
        props.put("fetch.min.bytes", String.valueOf(this.configuration.getInt(CK_FETCH_MIN_BYTES)));
        props.put("fetch.wait.max.ms", String.valueOf(this.configuration.getInt(CK_FETCH_WAIT_MAX)));
        props.put("zookeeper.connect", this.configuration.getString(CK_ZOOKEEPER));
        props.put("auto.offset.reset", this.configuration.getString(CK_OFFSET_RESET, DEFAULT_OFFSET_RESET));
        props.put("auto.commit.interval.ms", "1000");
        props.put("consumer.timeout.ms", "1000");
        int numThreads = this.configuration.getInt(CK_THREADS);
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        this.cc = Consumer.createJavaConsumerConnector((ConsumerConfig)consumerConfig);
        Whitelist filter = new Whitelist(this.configuration.getString(CK_TOPIC_FILTER));
        List streams = this.cc.createMessageStreamsByFilter((TopicFilter)filter, numThreads);
        ExecutorService executor = this.executorService(numThreads);
        this.stopLatch = new CountDownLatch(streams.size());
        for (final KafkaStream stream : streams) {
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    boolean retry;
                    ConsumerIterator consumerIterator = stream.iterator();
                    block3: do {
                        retry = false;
                        try {
                            while (consumerIterator.hasNext()) {
                                MessageAndMetadata message;
                                byte[] bytes;
                                if (KafkaTransport.this.paused) {
                                    LOG.debug("Message processing is paused, blocking until message processing is turned back on.");
                                    Uninterruptibles.awaitUninterruptibly((CountDownLatch)KafkaTransport.this.pausedLatch);
                                }
                                if (KafkaTransport.this.stopped) continue block3;
                                if (KafkaTransport.this.isThrottled()) {
                                    KafkaTransport.this.blockUntilUnthrottled();
                                }
                                if ((bytes = (byte[])(message = consumerIterator.next()).message()) == null) continue;
                                KafkaTransport.this.totalBytesRead.addAndGet(bytes.length);
                                KafkaTransport.this.lastSecBytesReadTmp.addAndGet(bytes.length);
                                RawMessage rawMessage = new RawMessage(bytes);
                                input.processRawMessage(rawMessage);
                            }
                        }
                        catch (ConsumerTimeoutException e) {
                            retry = true;
                        }
                        catch (Exception e) {
                            LOG.error("Kafka consumer error, stopping consumer thread.", (Throwable)e);
                        }
                    } while (retry && !KafkaTransport.this.stopped);
                    KafkaTransport.this.cc.commitOffsets();
                    KafkaTransport.this.stopLatch.countDown();
                }
            });
        }
        this.scheduler.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                KafkaTransport.this.lastSecBytesRead.set(KafkaTransport.this.lastSecBytesReadTmp.getAndSet(0L));
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    private ExecutorService executorService(int numThreads) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-transport-%d").build();
        return new InstrumentedExecutorService(Executors.newFixedThreadPool(numThreads, threadFactory), this.metricRegistry, MetricRegistry.name(this.getClass(), (String[])new String[]{"executor-service"}));
    }

    @Override
    public void doStop() {
        this.stopped = true;
        this.serverEventBus.unregister((Object)this);
        if (this.stopLatch != null) {
            try {
                if (this.pausedLatch != null && this.pausedLatch.getCount() > 0L) {
                    this.pausedLatch.countDown();
                }
                boolean allStoppedOrderly = this.stopLatch.await(5L, TimeUnit.SECONDS);
                this.stopLatch = null;
                if (!allStoppedOrderly) {
                    LOG.info("Stopping Kafka input timed out (waited 5 seconds for consumer threads to stop). Forcefully closing connection now. This is usually harmless when stopping the input.");
                }
            }
            catch (InterruptedException e) {
                LOG.debug("Interrupted while waiting to stop input.");
            }
        }
        if (this.cc != null) {
            this.cc.shutdown();
            this.cc = null;
        }
    }

    @Override
    public MetricSet getMetricSet() {
        return this.localRegistry;
    }

    @ConfigClass
    public static class Config
    extends ThrottleableTransport.Config {
        @Override
        public ConfigurationRequest getRequestedConfiguration() {
            ConfigurationRequest cr = super.getRequestedConfiguration();
            cr.addField(new TextField(KafkaTransport.CK_ZOOKEEPER, "ZooKeeper address", "127.0.0.1:2181", "Host and port of the ZooKeeper that is managing your Kafka cluster.", ConfigurationField.Optional.NOT_OPTIONAL));
            cr.addField(new TextField(KafkaTransport.CK_TOPIC_FILTER, "Topic filter regex", "^your-topic$", "Every topic that matches this regular expression will be consumed.", ConfigurationField.Optional.NOT_OPTIONAL));
            cr.addField(new NumberField(KafkaTransport.CK_FETCH_MIN_BYTES, "Fetch minimum bytes", 5, "Wait for a message batch to reach at least this size or the configured maximum wait time before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
            cr.addField(new NumberField(KafkaTransport.CK_FETCH_WAIT_MAX, "Fetch maximum wait time (ms)", 100, "Wait for this time or the configured minimum size of a message batch before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
            cr.addField(new NumberField(KafkaTransport.CK_THREADS, "Processor threads", 2, "Number of processor threads to spawn. Use one thread per Kafka topic partition.", ConfigurationField.Optional.NOT_OPTIONAL));
            cr.addField(new DropdownField(KafkaTransport.CK_OFFSET_RESET, "Auto offset reset", KafkaTransport.DEFAULT_OFFSET_RESET, (Map<String, String>)OFFSET_RESET_VALUES, "What to do when there is no initial offset in ZooKeeper or if an offset is out of range", ConfigurationField.Optional.OPTIONAL));
            return cr;
        }
    }

    @FactoryClass
    public static interface Factory
    extends Transport.Factory<KafkaTransport> {
        @Override
        public KafkaTransport create(Configuration var1);

        @Override
        public Config getConfig();
    }
}

