/*
 * Decompiled with CFR 0.152.
 */
package kieker.analysis.plugin.reader.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import kieker.analysis.plugin.reader.newio.IRawDataProcessor;
import kieker.analysis.plugin.reader.newio.IRawDataReader;
import kieker.analysis.plugin.reader.newio.Outcome;
import kieker.common.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChunkingAmqpReader
implements IRawDataReader {
    public static final String CONFIG_PROPERTY_URI = "uri";
    public static final String CONFIG_PROPERTY_QUEUENAME = "queueName";
    public static final String CONFIG_PROPERTY_HEARTBEAT = "heartbeat";
    private static final Logger LOGGER = LoggerFactory.getLogger(ChunkingAmqpReader.class);
    private final String uri;
    private final String queueName;
    private final int heartbeat;
    private final IRawDataProcessor processor;
    private volatile Connection connection;
    private volatile Channel channel;
    private volatile QueueingConsumer consumer;
    private volatile boolean terminated;

    public ChunkingAmqpReader(Configuration configuration, IRawDataProcessor processor) {
        this.uri = configuration.getStringProperty(CONFIG_PROPERTY_URI);
        this.queueName = configuration.getStringProperty(CONFIG_PROPERTY_QUEUENAME);
        this.heartbeat = configuration.getIntProperty(CONFIG_PROPERTY_HEARTBEAT);
        this.processor = processor;
    }

    @Override
    public Outcome onInitialization() {
        try {
            this.connection = this.createConnection();
            this.channel = this.connection.createChannel();
            this.consumer = new QueueingConsumer(this.channel);
            return Outcome.SUCCESS;
        }
        catch (IOException | URISyntaxException | KeyManagementException | NoSuchAlgorithmException | TimeoutException e) {
            this.handleInitializationError(e);
            return Outcome.FAILURE;
        }
    }

    private void handleInitializationError(Throwable e) {
        LOGGER.error("An error occurred initializing the AMQP reader.", e);
    }

    private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.uri);
        connectionFactory.setRequestedHeartbeat(this.heartbeat);
        return connectionFactory.newConnection();
    }

    @Override
    public Outcome read() {
        IRawDataProcessor rawDataProcessor = this.processor;
        try {
            this.channel.basicConsume(this.queueName, true, (Consumer)this.consumer);
            while (!this.terminated) {
                QueueingConsumer.Delivery delivery = this.consumer.nextDelivery();
                byte[] body = delivery.getBody();
                rawDataProcessor.decodeAndDeliverRecords(body);
            }
        }
        catch (IOException e) {
            LOGGER.error("Error while reading from queue {}", (Object)this.queueName, (Object)e);
            return Outcome.FAILURE;
        }
        catch (InterruptedException e) {
            LOGGER.error("Consumer was interrupted on queue {}", (Object)this.queueName, (Object)e);
            return Outcome.FAILURE;
        }
        return Outcome.SUCCESS;
    }

    @Override
    public Outcome onTermination() {
        try {
            this.terminated = true;
            this.connection.close();
            return Outcome.SUCCESS;
        }
        catch (IOException e) {
            LOGGER.error("IO error while trying to close the connection.", (Throwable)e);
            return Outcome.FAILURE;
        }
    }
}

