/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.inputs.transports;

import com.google.common.base.Strings;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Locale;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.graylog2.inputs.transports.AmqpTransport;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.journal.RawMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    private final String hostname;
    private final int port;
    private final String virtualHost;
    private final String username;
    private final String password;
    private final int prefetchCount;
    private final String queue;
    private final String exchange;
    private final boolean exchangeBind;
    private final String routingKey;
    private final boolean requeueInvalid;
    private Connection connection;
    private Channel channel;
    private final int heartbeatTimeout;
    private final MessageInput sourceInput;
    private final int parallelQueues;
    private final boolean tls;
    private AmqpTransport amqpTransport;
    private AtomicLong totalBytesRead = new AtomicLong(0L);
    private AtomicLong lastSecBytesRead = new AtomicLong(0L);
    private AtomicLong lastSecBytesReadTmp = new AtomicLong(0L);

    public AmqpConsumer(String hostname, int port, String virtualHost, String username, String password, int prefetchCount, String queue, String exchange, boolean exchangeBind, String routingKey, int parallelQueues, boolean tls, boolean requeueInvalid, int heartbeatTimeout, MessageInput sourceInput, ScheduledExecutorService scheduler, AmqpTransport amqpTransport) {
        this.hostname = hostname;
        this.port = port;
        this.virtualHost = virtualHost;
        this.username = username;
        this.password = password;
        this.prefetchCount = prefetchCount;
        this.queue = queue;
        this.exchange = exchange;
        this.exchangeBind = exchangeBind;
        this.routingKey = routingKey;
        this.heartbeatTimeout = heartbeatTimeout;
        this.sourceInput = sourceInput;
        this.parallelQueues = parallelQueues;
        this.tls = tls;
        this.requeueInvalid = requeueInvalid;
        this.amqpTransport = amqpTransport;
        scheduler.scheduleAtFixedRate(() -> this.lastSecBytesRead.set(this.lastSecBytesReadTmp.getAndSet(0L)), 1L, 1L, TimeUnit.SECONDS);
    }

    public void run() throws IOException {
        if (!this.isConnected()) {
            this.connect();
        }
        for (int i = 0; i < this.parallelQueues; ++i) {
            String queueName = String.format(Locale.ENGLISH, this.queue, i);
            this.channel.queueDeclare(queueName, true, false, false, null);
            if (this.exchangeBind) {
                this.channel.queueBind(queueName, this.exchange, this.routingKey);
            }
            this.channel.basicConsume(queueName, false, (Consumer)new DefaultConsumer(this.channel){

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    block4: {
                        long deliveryTag = envelope.getDeliveryTag();
                        try {
                            AmqpConsumer.this.totalBytesRead.addAndGet(body.length);
                            AmqpConsumer.this.lastSecBytesReadTmp.addAndGet(body.length);
                            RawMessage rawMessage = new RawMessage(body);
                            if (AmqpConsumer.this.amqpTransport.isThrottled()) {
                                AmqpConsumer.this.amqpTransport.blockUntilUnthrottled();
                            }
                            AmqpConsumer.this.sourceInput.processRawMessage(rawMessage);
                            AmqpConsumer.this.channel.basicAck(deliveryTag, false);
                        }
                        catch (Exception e) {
                            LOG.error("Error while trying to process AMQP message", (Throwable)e);
                            if (!AmqpConsumer.this.channel.isOpen()) break block4;
                            AmqpConsumer.this.channel.basicNack(deliveryTag, false, AmqpConsumer.this.requeueInvalid);
                            if (!LOG.isDebugEnabled()) break block4;
                            if (AmqpConsumer.this.requeueInvalid) {
                                LOG.debug("Re-queue message with delivery tag {}", (Object)deliveryTag);
                            }
                            LOG.debug("Message with delivery tag {} not re-queued", (Object)deliveryTag);
                        }
                    }
                }
            });
        }
    }

    public void connect() throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(this.hostname);
        factory.setPort(this.port);
        factory.setVirtualHost(this.virtualHost);
        factory.setRequestedHeartbeat(this.heartbeatTimeout);
        factory.setAutomaticRecoveryEnabled(true);
        if (this.tls) {
            try {
                LOG.info("Enabling TLS for AMQP input {}.", (Object)this.sourceInput.toIdentifier());
                factory.useSslProtocol();
            }
            catch (KeyManagementException | NoSuchAlgorithmException e) {
                throw new IOException("Couldn't enable TLS for AMQP input.", e);
            }
        }
        if (!Strings.isNullOrEmpty((String)this.username) && !Strings.isNullOrEmpty((String)this.password)) {
            factory.setUsername(this.username);
            factory.setPassword(this.password);
        }
        try {
            this.connection = factory.newConnection();
        }
        catch (TimeoutException e) {
            throw new IOException("Timeout while opening new AMQP connection", e);
        }
        this.channel = this.connection.createChannel();
        if (null == this.channel) {
            LOG.error("No channel descriptor available!");
        }
        if (null != this.channel && this.prefetchCount > 0) {
            this.channel.basicQos(this.prefetchCount);
            LOG.debug("AMQP prefetch count overriden to <{}>.", (Object)this.prefetchCount);
        }
        this.connection.addShutdownListener(cause -> {
            if (cause.isInitiatedByApplication()) {
                LOG.info("Shutting down AMPQ consumer.");
                return;
            }
            LOG.warn("AMQP connection lost! Reconnecting ...");
        });
    }

    public void stop() throws IOException {
        if (this.channel != null && this.channel.isOpen()) {
            try {
                this.channel.close();
            }
            catch (TimeoutException e) {
                LOG.error("Timeout when closing AMQP channel", (Throwable)e);
                this.channel.abort();
            }
        }
        if (this.connection != null && this.connection.isOpen()) {
            this.connection.close();
        }
    }

    public boolean isConnected() {
        return this.connection != null && this.connection.isOpen() && this.channel != null && this.channel.isOpen();
    }

    public AtomicLong getLastSecBytesRead() {
        return this.lastSecBytesRead;
    }

    public AtomicLong getTotalBytesRead() {
        return this.totalBytesRead;
    }
}

