/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.inputs.transports;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import jakarta.inject.Named;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.graylog.shaded.kafka09.consumer.Consumer;
import org.graylog.shaded.kafka09.consumer.ConsumerConfig;
import org.graylog.shaded.kafka09.consumer.ConsumerIterator;
import org.graylog.shaded.kafka09.consumer.ConsumerTimeoutException;
import org.graylog.shaded.kafka09.consumer.KafkaStream;
import org.graylog.shaded.kafka09.consumer.TopicFilter;
import org.graylog.shaded.kafka09.consumer.Whitelist;
import org.graylog.shaded.kafka09.javaapi.consumer.ConsumerConnector;
import org.graylog.shaded.kafka09.message.MessageAndMetadata;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.BooleanField;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.ThrottleableTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTransport
extends ThrottleableTransport {
    public static final String CK_LEGACY = "legacy_mode";
    public static final String CK_FETCH_MIN_BYTES = "fetch_min_bytes";
    public static final String CK_FETCH_WAIT_MAX = "fetch_wait_max";
    public static final String CK_ZOOKEEPER = "zookeeper";
    public static final String CK_BOOTSTRAP = "bootstrap_server";
    public static final String CK_TOPIC_FILTER = "topic_filter";
    public static final String CK_THREADS = "threads";
    public static final String CK_OFFSET_RESET = "offset_reset";
    public static final String CK_GROUP_ID = "group_id";
    public static final String CK_CUSTOM_PROPERTIES = "custom_properties";
    private static final ImmutableMap<String, String> OFFSET_RESET_VALUES = ImmutableMap.of((Object)"largest", (Object)"Automatically reset the offset to the latest offset", (Object)"smallest", (Object)"Automatically reset the offset to the earliest offset");
    private static final String DEFAULT_OFFSET_RESET = "largest";
    private static final String DEFAULT_GROUP_ID = "graylog2";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTransport.class);
    private final Configuration configuration;
    private final MetricRegistry localRegistry;
    private final NodeId nodeId;
    private final EventBus serverEventBus;
    private final ServerStatus serverStatus;
    private final ScheduledExecutorService scheduler;
    private final MetricRegistry metricRegistry;
    private final AtomicLong totalBytesRead = new AtomicLong(0L);
    private final AtomicLong lastSecBytesRead = new AtomicLong(0L);
    private final AtomicLong lastSecBytesReadTmp = new AtomicLong(0L);
    private final ExecutorService executor;
    private volatile boolean stopped = false;
    private volatile boolean paused = true;
    private volatile CountDownLatch pausedLatch = new CountDownLatch(1);
    private CountDownLatch stopLatch;
    private ConsumerConnector cc;

    @AssistedInject
    public KafkaTransport(@Assisted Configuration configuration, LocalMetricRegistry localRegistry, NodeId nodeId, EventBus serverEventBus, ServerStatus serverStatus, @Named(value="daemonScheduler") ScheduledExecutorService scheduler) {
        super(serverEventBus, configuration);
        this.configuration = configuration;
        this.localRegistry = localRegistry;
        this.nodeId = nodeId;
        this.serverEventBus = serverEventBus;
        this.serverStatus = serverStatus;
        this.scheduler = scheduler;
        this.metricRegistry = localRegistry;
        int numThreads = configuration.getInt(CK_THREADS);
        this.executor = this.executorService(numThreads);
        localRegistry.register("read_bytes_1sec", (Metric)new Gauge<Long>(){

            public Long getValue() {
                return KafkaTransport.this.lastSecBytesRead.get();
            }
        });
        localRegistry.register("written_bytes_1sec", (Metric)new Gauge<Long>(){

            public Long getValue() {
                return 0L;
            }
        });
        localRegistry.register("read_bytes_total", (Metric)new Gauge<Long>(){

            public Long getValue() {
                return KafkaTransport.this.totalBytesRead.get();
            }
        });
        localRegistry.register("written_bytes_total", (Metric)new Gauge<Long>(){

            public Long getValue() {
                return 0L;
            }
        });
    }

    @Subscribe
    public void lifecycleStateChange(Lifecycle lifecycle) {
        LOG.debug("Lifecycle changed to {}", (Object)lifecycle);
        switch (lifecycle) {
            case PAUSED: 
            case FAILED: 
            case HALTING: {
                this.pausedLatch = new CountDownLatch(1);
                this.paused = true;
                break;
            }
            default: {
                this.paused = false;
                this.pausedLatch.countDown();
            }
        }
    }

    @Override
    public void setMessageAggregator(CodecAggregator ignored) {
    }

    @Override
    public void doLaunch(MessageInput input) {
        boolean legacyMode = this.configuration.getBoolean(CK_LEGACY, true);
        if (legacyMode) {
            String zooKeper = this.configuration.getString(CK_ZOOKEEPER);
            if (Strings.isNullOrEmpty((String)zooKeper)) {
                throw new IllegalArgumentException("ZooKeeper configuration setting cannot be empty");
            }
        } else {
            String bootStrap = this.configuration.getString(CK_BOOTSTRAP);
            if (Strings.isNullOrEmpty((String)bootStrap)) {
                throw new IllegalArgumentException("Bootstrap server configuration setting cannot be empty");
            }
        }
        this.serverStatus.awaitRunning(() -> this.lifecycleStateChange(Lifecycle.RUNNING));
        this.serverEventBus.register((Object)this);
        if (legacyMode) {
            this.doLaunchLegacy(input);
        } else {
            this.doLaunchConsumer(input);
        }
        this.scheduler.scheduleAtFixedRate(() -> this.lastSecBytesRead.set(this.lastSecBytesReadTmp.getAndSet(0L)), 1L, 1L, TimeUnit.SECONDS);
    }

    private void doLaunchConsumer(MessageInput input) {
        Properties props = new Properties();
        props.put("group.id", this.configuration.getString(CK_GROUP_ID, DEFAULT_GROUP_ID));
        props.put("fetch.min.bytes", String.valueOf(this.configuration.getInt(CK_FETCH_MIN_BYTES)));
        props.put("fetch.max.wait.ms", String.valueOf(this.configuration.getInt(CK_FETCH_WAIT_MAX)));
        props.put("bootstrap.servers", this.configuration.getString(CK_BOOTSTRAP));
        String resetValue = this.configuration.getString(CK_OFFSET_RESET, DEFAULT_OFFSET_RESET);
        props.put("auto.offset.reset", resetValue.equals(DEFAULT_OFFSET_RESET) ? "latest" : "earliest");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", ByteArrayDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());
        this.insertCustomProperties(props);
        int numThreads = this.configuration.getInt(CK_THREADS);
        this.stopLatch = new CountDownLatch(numThreads);
        IntStream.range(0, numThreads).forEach(i -> this.executor.submit(new ConsumerRunnable(props, input, i)));
    }

    private void doLaunchLegacy(final MessageInput input) {
        Properties props = new Properties();
        props.put("group.id", this.configuration.getString(CK_GROUP_ID, DEFAULT_GROUP_ID));
        props.put("client.id", "gl2-" + this.nodeId.getShortNodeId() + "-" + input.getId());
        props.put("fetch.min.bytes", String.valueOf(this.configuration.getInt(CK_FETCH_MIN_BYTES)));
        props.put("fetch.wait.max.ms", String.valueOf(this.configuration.getInt(CK_FETCH_WAIT_MAX)));
        props.put("zookeeper.connect", this.configuration.getString(CK_ZOOKEEPER));
        props.put("auto.offset.reset", this.configuration.getString(CK_OFFSET_RESET, DEFAULT_OFFSET_RESET));
        props.put("auto.commit.interval.ms", "1000");
        props.put("consumer.timeout.ms", "1000");
        this.insertCustomProperties(props);
        int numThreads = this.configuration.getInt(CK_THREADS);
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        this.cc = Consumer.createJavaConsumerConnector((ConsumerConfig)consumerConfig);
        Whitelist filter = new Whitelist(this.configuration.getString(CK_TOPIC_FILTER));
        List streams = this.cc.createMessageStreamsByFilter((TopicFilter)filter, numThreads);
        this.stopLatch = new CountDownLatch(streams.size());
        for (final KafkaStream stream : streams) {
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    boolean retry;
                    ConsumerIterator consumerIterator = stream.iterator();
                    block3: do {
                        retry = false;
                        try {
                            while (consumerIterator.hasNext()) {
                                MessageAndMetadata message;
                                byte[] bytes;
                                if (KafkaTransport.this.paused) {
                                    LOG.debug("Message processing is paused, blocking until message processing is turned back on.");
                                    Uninterruptibles.awaitUninterruptibly((CountDownLatch)KafkaTransport.this.pausedLatch);
                                }
                                if (KafkaTransport.this.stopped) continue block3;
                                if (KafkaTransport.this.isThrottled()) {
                                    KafkaTransport.this.blockUntilUnthrottled();
                                }
                                if ((bytes = (byte[])(message = consumerIterator.next()).message()) == null) continue;
                                KafkaTransport.this.totalBytesRead.addAndGet(bytes.length);
                                KafkaTransport.this.lastSecBytesReadTmp.addAndGet(bytes.length);
                                RawMessage rawMessage = new RawMessage(bytes);
                                input.processRawMessage(rawMessage);
                            }
                        }
                        catch (ConsumerTimeoutException e) {
                            retry = true;
                        }
                        catch (Exception e) {
                            LOG.error("Kafka consumer error, stopping consumer thread.", (Throwable)e);
                        }
                    } while (retry && !KafkaTransport.this.stopped);
                    KafkaTransport.this.cc.commitOffsets();
                    KafkaTransport.this.stopLatch.countDown();
                }
            });
        }
    }

    private void insertCustomProperties(Properties props) {
        try {
            Properties customProperties = new Properties();
            customProperties.load(new ByteArrayInputStream(this.configuration.getString(CK_CUSTOM_PROPERTIES, "").getBytes(StandardCharsets.UTF_8)));
            props.putAll((Map<?, ?>)customProperties);
        }
        catch (IOException e) {
            LOG.error("Failed to read custom properties", (Throwable)e);
        }
    }

    private ExecutorService executorService(int numThreads) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-transport-%d").build();
        return new InstrumentedExecutorService(Executors.newFixedThreadPool(numThreads, threadFactory), this.metricRegistry, MetricRegistry.name(this.getClass(), (String[])new String[]{"executor-service"}));
    }

    @Override
    public void doStop() {
        this.stopped = true;
        this.serverEventBus.unregister((Object)this);
        if (this.stopLatch != null) {
            try {
                if (this.pausedLatch != null && this.pausedLatch.getCount() > 0L) {
                    this.pausedLatch.countDown();
                }
                boolean allStoppedOrderly = this.stopLatch.await(5L, TimeUnit.SECONDS);
                this.stopLatch = null;
                if (!allStoppedOrderly) {
                    LOG.info("Stopping Kafka input timed out (waited 5 seconds for consumer threads to stop). Forcefully closing connection now. This is usually harmless when stopping the input.");
                }
            }
            catch (InterruptedException e) {
                LOG.debug("Interrupted while waiting to stop input.");
            }
        }
        if (this.cc != null) {
            this.cc.shutdown();
            this.cc = null;
        }
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted in transport executor shutdown.");
        }
    }

    @Override
    public MetricSet getMetricSet() {
        return this.localRegistry;
    }

    private class ConsumerRunnable
    implements Runnable {
        private final MessageInput input;
        private final KafkaConsumer<byte[], byte[]> consumer;

        public ConsumerRunnable(Properties props, MessageInput input, int threadId) {
            this.input = input;
            Properties nprops = (Properties)props.clone();
            nprops.put("client.id", "gl2-" + KafkaTransport.this.nodeId.getShortNodeId() + "-" + input.getId() + "-" + threadId);
            this.consumer = new KafkaConsumer(nprops);
            this.consumer.subscribe(Pattern.compile(KafkaTransport.this.configuration.getString(KafkaTransport.CK_TOPIC_FILTER)));
        }

        private void consumeRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
            for (ConsumerRecord record : consumerRecords) {
                byte[] bytes;
                if (KafkaTransport.this.paused) {
                    LOG.debug("Message processing is paused, blocking until message processing is turned back on.");
                    Uninterruptibles.awaitUninterruptibly((CountDownLatch)KafkaTransport.this.pausedLatch);
                }
                if (KafkaTransport.this.stopped) break;
                if (KafkaTransport.this.isThrottled()) {
                    KafkaTransport.this.blockUntilUnthrottled();
                }
                if ((bytes = (byte[])record.value()) == null) continue;
                KafkaTransport.this.totalBytesRead.addAndGet(bytes.length);
                KafkaTransport.this.lastSecBytesReadTmp.addAndGet(bytes.length);
                RawMessage rawMessage = new RawMessage(bytes);
                this.input.processRawMessage(rawMessage);
            }
        }

        private Optional<ConsumerRecords<byte[], byte[]>> tryPoll() {
            try {
                ConsumerRecords consumerRecords = this.consumer.poll(Duration.ofSeconds(1L));
                return Optional.of(consumerRecords);
            }
            catch (WakeupException e) {
                LOG.error("WakeupException in poll.");
            }
            catch (InvalidOffsetException | AuthorizationException e) {
                LOG.error("Exception in poll.", e);
            }
            return Optional.empty();
        }

        @Override
        public void run() {
            while (!KafkaTransport.this.stopped) {
                Optional<ConsumerRecords<byte[], byte[]>> consumerRecords;
                try {
                    consumerRecords = this.tryPoll();
                    if (!consumerRecords.isPresent()) {
                        LOG.error("Caught recoverable exception. Retrying");
                        Thread.sleep(2000L);
                        continue;
                    }
                }
                catch (InterruptedException | KafkaException e) {
                    LOG.error("Caught unrecoverable exception in poll. Stopping input", e);
                    KafkaTransport.this.stopped = true;
                    break;
                }
                try {
                    this.consumeRecords(consumerRecords.get());
                }
                catch (Exception e) {
                    LOG.error("Exception in consumer thread. Stopping input", (Throwable)e);
                    KafkaTransport.this.stopped = true;
                    break;
                }
            }
            this.consumer.commitAsync();
            KafkaTransport.this.stopLatch.countDown();
            this.consumer.close(Duration.ofSeconds(5L));
        }
    }

    @ConfigClass
    public static class Config
    extends ThrottleableTransport.Config {
        @Override
        public ConfigurationRequest getRequestedConfiguration() {
            ConfigurationRequest cr = super.getRequestedConfiguration();
            cr.addField(new BooleanField(KafkaTransport.CK_LEGACY, "Legacy mode", true, "Use old ZooKeeper-based consumer API. (Used before Graylog 3.3)", 10));
            cr.addField(new TextField(KafkaTransport.CK_BOOTSTRAP, "Bootstrap Servers", "127.0.0.1:9092", "Comma separated list of one or more Kafka brokers. (Format: \"host1:port1,host2:port2\").Not used in legacy mode.", ConfigurationField.Optional.OPTIONAL, 11, new TextField.Attribute[0]));
            cr.addField(new TextField(KafkaTransport.CK_ZOOKEEPER, "ZooKeeper address (legacy mode only)", "127.0.0.1:2181", "Host and port of the ZooKeeper that is managing your Kafka cluster. Not used in consumer API (non-legacy) mode.", ConfigurationField.Optional.OPTIONAL, 12, new TextField.Attribute[0]));
            cr.addField(new TextField(KafkaTransport.CK_TOPIC_FILTER, "Topic filter regex", "^your-topic$", "Every topic that matches this regular expression will be consumed.", ConfigurationField.Optional.NOT_OPTIONAL));
            cr.addField(new NumberField(KafkaTransport.CK_FETCH_MIN_BYTES, "Fetch minimum bytes", 5, "Wait for a message batch to reach at least this size or the configured maximum wait time before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
            cr.addField(new NumberField(KafkaTransport.CK_FETCH_WAIT_MAX, "Fetch maximum wait time (ms)", 100, "Wait for this time or the configured minimum size of a message batch before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
            cr.addField(new NumberField(KafkaTransport.CK_THREADS, "Processor threads", 2, "Number of processor threads to spawn. Use one thread per Kafka topic partition.", ConfigurationField.Optional.NOT_OPTIONAL));
            cr.addField(new DropdownField(KafkaTransport.CK_OFFSET_RESET, "Auto offset reset", KafkaTransport.DEFAULT_OFFSET_RESET, (Map<String, String>)OFFSET_RESET_VALUES, "What to do when there is no initial offset in Kafka or if an offset is out of range", ConfigurationField.Optional.OPTIONAL));
            cr.addField(new TextField(KafkaTransport.CK_GROUP_ID, "Consumer group id", KafkaTransport.DEFAULT_GROUP_ID, "Name of the consumer group the Kafka input belongs to", ConfigurationField.Optional.OPTIONAL));
            cr.addField(new TextField(KafkaTransport.CK_CUSTOM_PROPERTIES, "Custom Kafka properties", "", "A newline separated list of Kafka properties. (e.g.: \"ssl.keystore.location=/etc/graylog/server/kafka.keystore.jks\").", ConfigurationField.Optional.OPTIONAL, 200, TextField.Attribute.TEXTAREA, TextField.Attribute.IS_SENSITIVE));
            return cr;
        }
    }

    @FactoryClass
    public static interface Factory
    extends Transport.Factory<KafkaTransport> {
        @Override
        public KafkaTransport create(Configuration var1);

        @Override
        public Config getConfig();
    }
}

