/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.inputs.transports;

import com.google.common.base.Strings;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ExceptionHandler;
import com.rabbitmq.client.TrafficListener;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Locale;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.graylog2.inputs.transports.AmqpTransport;
import org.graylog2.plugin.InputFailureRecorder;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.security.encryption.EncryptedValue;
import org.graylog2.security.encryption.EncryptedValueService;
import org.graylog2.shared.utilities.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    private final String hostname;
    private final int port;
    private final String virtualHost;
    private final String username;
    private final EncryptedValue password;
    private final int prefetchCount;
    private final String queue;
    private final String exchange;
    private final boolean exchangeBind;
    private final String routingKey;
    private final boolean requeueInvalid;
    private final int heartbeatTimeout;
    private final MessageInput sourceInput;
    private final int parallelQueues;
    private final boolean tls;
    private final ScheduledExecutorService scheduler;
    private final InputFailureRecorder inputFailureRecorder;
    private final AmqpTransport amqpTransport;
    private final EncryptedValueService encryptedValueService;
    private final Duration connectionRecoveryInterval;
    private final AtomicLong totalBytesRead = new AtomicLong(0L);
    private final AtomicLong lastSecBytesRead = new AtomicLong(0L);
    private final AtomicLong lastSecBytesReadTmp = new AtomicLong(0L);
    private Connection connection;
    private Channel channel;
    private ScheduledFuture<?> scheduledFuture;

    public AmqpConsumer(int heartbeatTimeout, MessageInput sourceInput, Configuration configuration, ScheduledExecutorService scheduler, InputFailureRecorder inputFailureRecorder, AmqpTransport amqpTransport, EncryptedValueService encryptedValueService, Duration connectionRecoveryInterval) {
        this.hostname = configuration.getString("broker_hostname");
        this.port = configuration.getInt("broker_port");
        this.virtualHost = configuration.getString("broker_vhost");
        this.username = configuration.getString("broker_username");
        this.password = configuration.getEncryptedValue("broker_password");
        this.prefetchCount = configuration.getInt("prefetch");
        this.queue = configuration.getString("queue");
        this.exchange = configuration.getString("exchange");
        this.exchangeBind = configuration.getBoolean("exchange_bind");
        this.routingKey = configuration.getString("routing_key");
        this.parallelQueues = configuration.getInt("parallel_queues");
        this.requeueInvalid = configuration.getBoolean("requeue_invalid_messages");
        this.tls = configuration.getBoolean("tls");
        this.heartbeatTimeout = heartbeatTimeout;
        this.sourceInput = sourceInput;
        this.scheduler = scheduler;
        this.inputFailureRecorder = inputFailureRecorder;
        this.amqpTransport = amqpTransport;
        this.encryptedValueService = encryptedValueService;
        this.connectionRecoveryInterval = connectionRecoveryInterval;
    }

    public void run() throws IOException, TimeoutException {
        if (!this.isConnected()) {
            this.connect();
        }
        this.scheduledFuture = this.scheduler.scheduleAtFixedRate(() -> this.lastSecBytesRead.set(this.lastSecBytesReadTmp.getAndSet(0L)), 1L, 1L, TimeUnit.SECONDS);
        for (int i = 0; i < this.parallelQueues; ++i) {
            String queueName = String.format(Locale.ENGLISH, this.queue, i);
            this.channel.queueDeclare(queueName, true, false, false, null);
            if (this.exchangeBind) {
                this.channel.queueBind(queueName, this.exchange, this.routingKey);
            }
            this.channel.basicConsume(queueName, false, (Consumer)new DefaultConsumer(this.channel){

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    block4: {
                        long deliveryTag = envelope.getDeliveryTag();
                        try {
                            AmqpConsumer.this.totalBytesRead.addAndGet(body.length);
                            AmqpConsumer.this.lastSecBytesReadTmp.addAndGet(body.length);
                            RawMessage rawMessage = new RawMessage(body);
                            if (AmqpConsumer.this.amqpTransport.isThrottled()) {
                                AmqpConsumer.this.amqpTransport.blockUntilUnthrottled();
                            }
                            AmqpConsumer.this.sourceInput.processRawMessage(rawMessage);
                            AmqpConsumer.this.channel.basicAck(deliveryTag, false);
                        }
                        catch (Exception e) {
                            LOG.error("Error while trying to process AMQP message", (Throwable)e);
                            if (!AmqpConsumer.this.channel.isOpen()) break block4;
                            AmqpConsumer.this.channel.basicNack(deliveryTag, false, AmqpConsumer.this.requeueInvalid);
                            if (!LOG.isDebugEnabled()) break block4;
                            if (AmqpConsumer.this.requeueInvalid) {
                                LOG.debug("Re-queue message with delivery tag {}", (Object)deliveryTag);
                            }
                            LOG.debug("Message with delivery tag {} not re-queued", (Object)deliveryTag);
                        }
                    }
                }
            });
        }
    }

    public void connect() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setExceptionHandler((ExceptionHandler)new DefaultExceptionHandler(){

            public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
                super.handleConnectionRecoveryException(conn, exception);
                AmqpConsumer.this.inputFailureRecorder.setFailing(((Object)((Object)this)).getClass(), "Connection recovery error!", exception);
            }
        });
        factory.setTrafficListener(new TrafficListener(){

            public void write(Command outboundCommand) {
            }

            public void read(Command inboundCommand) {
                AmqpConsumer.this.inputFailureRecorder.setRunning();
            }
        });
        factory.setHost(this.hostname);
        factory.setPort(this.port);
        factory.setVirtualHost(this.virtualHost);
        factory.setRequestedHeartbeat(this.heartbeatTimeout);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(this.connectionRecoveryInterval.toMillis());
        if (this.tls) {
            try {
                LOG.info("Enabling TLS for AMQP input {}.", (Object)this.sourceInput.toIdentifier());
                factory.useSslProtocol();
            }
            catch (KeyManagementException | NoSuchAlgorithmException e) {
                throw new IOException("Couldn't enable TLS for AMQP input.", e);
            }
        }
        if (!Strings.isNullOrEmpty((String)this.username) && this.password.isSet()) {
            factory.setUsername(this.username);
            factory.setPassword(this.encryptedValueService.decrypt(this.password));
        }
        this.connection = factory.newConnection();
        this.channel = this.connection.createChannel();
        if (null == this.channel) {
            LOG.error("No channel descriptor available!");
        }
        if (null != this.channel && this.prefetchCount > 0) {
            this.channel.basicQos(this.prefetchCount);
            LOG.debug("AMQP prefetch count overriden to <{}>.", (Object)this.prefetchCount);
        }
        this.connection.addShutdownListener(cause -> {
            if (cause.isInitiatedByApplication()) {
                LOG.info("Shutting down AMPQ consumer.");
                return;
            }
            this.inputFailureRecorder.setFailing(this.getClass(), StringUtils.f("AMQP connection lost (reason: %s)! Reconnecting ...", cause.getReason().protocolMethodName()));
        });
    }

    public void stop() throws IOException {
        if (this.channel != null && this.channel.isOpen()) {
            try {
                this.channel.close();
            }
            catch (TimeoutException e) {
                LOG.error("Timeout when closing AMQP channel", (Throwable)e);
                this.channel.abort();
            }
        }
        if (this.connection != null && this.connection.isOpen()) {
            this.connection.close();
        } else if (this.connection != null) {
            this.connection.abort();
        }
        if (null != this.scheduledFuture) {
            this.scheduledFuture.cancel(true);
        }
    }

    public boolean isConnected() {
        return this.connection != null && this.connection.isOpen() && this.channel != null && this.channel.isOpen();
    }

    public AtomicLong getLastSecBytesRead() {
        return this.lastSecBytesRead;
    }

    public AtomicLong getTotalBytesRead() {
        return this.totalBytesRead;
    }
}

