/*
 * Decompiled with CFR 0.152.
 */
package kieker.analysis.tt.reader.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import kieker.analysis.tt.reader.amqp.RegistryRecordHandler;
import kieker.analysis.tt.reader.amqp.RegularRecordHandler;
import kieker.common.record.IMonitoringRecord;
import kieker.common.registry.reader.ReaderRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public final class AMQPReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(AMQPReader.class);
    private static final byte REGISTRY_RECORD_ID = -1;
    private static final byte REGULAR_RECORD_ID = 1;
    private final String uri;
    private final String queueName;
    private final int heartbeat;
    private volatile Connection connection;
    private volatile Channel channel;
    private volatile QueueingConsumer consumer;
    private final ReaderRegistry<String> stringRegistry = new ReaderRegistry();
    private volatile Thread registryRecordHandlerThread;
    private volatile RegistryRecordHandler registryRecordHandler;
    private volatile RegularRecordHandler regularRecordHandler;
    private volatile Thread regularRecordHandlerThread;
    private volatile boolean terminated;
    private volatile boolean threadsStarted;
    private final Consumer<IMonitoringRecord> elementReceivedCallback;

    public AMQPReader(String uri, String queueName, int heartbeat, Consumer<IMonitoringRecord> elementReceivedCallback) {
        this.uri = uri;
        this.queueName = queueName;
        this.heartbeat = heartbeat;
        this.elementReceivedCallback = elementReceivedCallback;
        this.init();
    }

    public void init() {
        try {
            this.connection = this.createConnection();
            this.channel = this.connection.createChannel();
            this.consumer = new QueueingConsumer(this.channel);
            this.registryRecordHandler = new RegistryRecordHandler(this.stringRegistry);
            this.regularRecordHandler = new RegularRecordHandler(this, this.stringRegistry);
            this.registryRecordHandlerThread = new Thread(this.registryRecordHandler);
            this.registryRecordHandlerThread.setDaemon(true);
            this.regularRecordHandlerThread = new Thread(this.regularRecordHandler);
            this.regularRecordHandlerThread.setDaemon(true);
        }
        catch (KeyManagementException e) {
            this.handleInitializationError(e);
        }
        catch (NoSuchAlgorithmException e) {
            this.handleInitializationError(e);
        }
        catch (IOException e) {
            this.handleInitializationError(e);
        }
        catch (TimeoutException e) {
            this.handleInitializationError(e);
        }
        catch (URISyntaxException e) {
            this.handleInitializationError(e);
        }
    }

    private void handleInitializationError(Throwable e) {
        LOGGER.error("An error occurred initializing the AMQP reader: {}", e);
    }

    private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.uri);
        connectionFactory.setRequestedHeartbeat(this.heartbeat);
        return connectionFactory.newConnection();
    }

    public boolean read() {
        if (!this.threadsStarted) {
            this.registryRecordHandlerThread.start();
            this.regularRecordHandlerThread.start();
            this.threadsStarted = true;
        }
        try {
            this.channel.basicConsume(this.queueName, true, (com.rabbitmq.client.Consumer)this.consumer);
            block8: while (!this.terminated) {
                QueueingConsumer.Delivery delivery = this.consumer.nextDelivery();
                byte[] body = delivery.getBody();
                ByteBuffer buffer = ByteBuffer.wrap(body);
                byte recordType = buffer.get();
                switch (recordType) {
                    case -1: {
                        this.registryRecordHandler.enqueueRegistryRecord(buffer);
                        continue block8;
                    }
                    case 1: {
                        this.regularRecordHandler.enqueueRegularRecord(buffer);
                        continue block8;
                    }
                }
                if (!LOGGER.isErrorEnabled()) continue;
                LOGGER.error(String.format("Unknown record type: %02x", recordType));
            }
        }
        catch (IOException e) {
            LOGGER.error("Error while reading from queue {}", (Object)this.queueName, (Object)e);
            return false;
        }
        catch (InterruptedException e) {
            LOGGER.error("Consumer was interrupted on queue {}", (Object)this.queueName, (Object)e);
            return false;
        }
        catch (ShutdownSignalException e) {
            LOGGER.info("Consumer was shut down while waiting on queue {}", (Object)this.queueName, (Object)e);
            return true;
        }
        return true;
    }

    public void terminate() {
        try {
            this.terminated = true;
            this.connection.close();
        }
        catch (IOException e) {
            LOGGER.error("IO error while trying to close the connection.", (Throwable)e);
        }
    }

    public void deliverRecord(IMonitoringRecord monitoringRecord) {
        this.elementReceivedCallback.accept(monitoringRecord);
    }
}

