/*
 * Decompiled with CFR 0.152.
 */
package kieker.analysis.plugin.reader.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.reader.AbstractStringRegistryReaderPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.record.IMonitoringRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(description="A plugin that reads monitoring records from an AMQP queue", outputPorts={@OutputPort(name="monitoringRecords", eventTypes={IMonitoringRecord.class}, description="Output port of the AMQP reader")}, configuration={@Property(name="uri", defaultValue="amqp://localhost", description="Server URI of the AMQP server"), @Property(name="queueName", defaultValue="kieker", description="AMQP queue name"), @Property(name="heartbeat", defaultValue="60", description="Heartbeat interval (in seconds)"), @Property(name="cacheDuration", defaultValue="60", description="Cache duration (in seconds) for string registries")})
public final class AmqpReader
extends AbstractStringRegistryReaderPlugin {
    public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
    public static final String CONFIG_PROPERTY_URI = "uri";
    public static final String CONFIG_PROPERTY_QUEUENAME = "queueName";
    public static final String CONFIG_PROPERTY_HEARTBEAT = "heartbeat";
    public static final String CONFIG_PROPERTY_CACHE_DURATION = "cacheDuration";
    private static final Logger LOGGER = LoggerFactory.getLogger((String)AmqpReader.class.getCanonicalName());
    private static final byte REGISTRY_RECORD_ID = -1;
    private static final byte REGULAR_RECORD_ID = 1;
    private final String uri;
    private final String queueName;
    private final int heartbeat;
    private final long cacheDuration;
    private volatile Connection connection;
    private volatile Channel channel;
    private volatile QueueingConsumer consumer;
    private volatile boolean terminated;

    public AmqpReader(Configuration configuration, IProjectContext projectContext) {
        super(configuration, projectContext, CONFIG_PROPERTY_CACHE_DURATION, TimeUnit.SECONDS);
        this.uri = this.configuration.getStringProperty(CONFIG_PROPERTY_URI);
        this.queueName = this.configuration.getStringProperty(CONFIG_PROPERTY_QUEUENAME);
        this.heartbeat = this.configuration.getIntProperty(CONFIG_PROPERTY_HEARTBEAT);
        this.cacheDuration = this.configuration.getLongProperty(CONFIG_PROPERTY_CACHE_DURATION);
    }

    @Override
    public boolean init() {
        boolean superInitSucceeded = super.init();
        if (!superInitSucceeded) {
            return false;
        }
        try {
            this.connection = this.createConnection();
            this.channel = this.connection.createChannel();
            this.consumer = new QueueingConsumer(this.channel);
        }
        catch (KeyManagementException e) {
            this.handleInitializationError(e);
            return false;
        }
        catch (NoSuchAlgorithmException e) {
            this.handleInitializationError(e);
            return false;
        }
        catch (IOException e) {
            this.handleInitializationError(e);
            return false;
        }
        catch (TimeoutException e) {
            this.handleInitializationError(e);
            return false;
        }
        catch (URISyntaxException e) {
            this.handleInitializationError(e);
            return false;
        }
        return true;
    }

    private void handleInitializationError(Throwable e) {
        LOGGER.error("An error occurred initializing the AMQP reader:", e);
    }

    private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.uri);
        connectionFactory.setRequestedHeartbeat(this.heartbeat);
        return connectionFactory.newConnection();
    }

    @Override
    public Configuration getCurrentConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setProperty(CONFIG_PROPERTY_URI, this.uri);
        configuration.setProperty(CONFIG_PROPERTY_QUEUENAME, this.queueName);
        configuration.setProperty(CONFIG_PROPERTY_HEARTBEAT, Integer.toString(this.heartbeat));
        configuration.setProperty(CONFIG_PROPERTY_CACHE_DURATION, Long.toString(this.cacheDuration));
        return configuration;
    }

    @Override
    public boolean read() {
        this.ensureThreadsStarted();
        try {
            this.channel.basicConsume(this.queueName, true, (Consumer)this.consumer);
            block7: while (!this.terminated) {
                QueueingConsumer.Delivery delivery = this.consumer.nextDelivery();
                byte[] body = delivery.getBody();
                ByteBuffer buffer = ByteBuffer.wrap(body);
                byte recordType = buffer.get();
                switch (recordType) {
                    case -1: {
                        this.handleRegistryRecord(buffer);
                        continue block7;
                    }
                    case 1: {
                        this.handleRegularRecord(buffer);
                        continue block7;
                    }
                }
                this.logger.error(String.format("Unknown record type: %02x", recordType));
            }
        }
        catch (IOException e) {
            this.logger.error("Error while reading from queue {}", (Object)this.queueName, (Object)e);
            return false;
        }
        catch (InterruptedException e) {
            this.logger.error("Consumer was interrupted on queue {}", (Object)this.queueName, (Object)e);
            return false;
        }
        return true;
    }

    @Override
    public void terminate(boolean error) {
        try {
            this.terminated = true;
            this.connection.close();
        }
        catch (IOException e) {
            this.logger.error("IO error while trying to close the connection.", (Throwable)e);
        }
    }

    @Override
    protected void deliverRecord(IMonitoringRecord monitoringRecord) {
        this.deliver(OUTPUT_PORT_NAME_RECORDS, monitoringRecord);
    }
}

