/*
 * Decompiled with CFR 0.152.
 */
package kieker.monitoring.writer.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import kieker.common.configuration.Configuration;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.io.BinaryValueSerializer;
import kieker.common.registry.IRegistryListener;
import kieker.common.registry.writer.WriterRegistry;
import kieker.common.util.thread.DaemonThreadFactory;
import kieker.monitoring.writer.AbstractMonitoringWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpWriter
extends AbstractMonitoringWriter
implements IRegistryListener<String> {
    public static final byte REGISTRY_RECORD_ID = -1;
    public static final byte REGULAR_RECORD_ID = 1;
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpWriter.class);
    private static final int DEFAULT_BUFFER_SIZE = 16384;
    private static final int SIZE_OF_ENVELOPE = 9;
    private static final String PREFIX = AmqpWriter.class.getName() + ".";
    public static final String CONFIG_URI = PREFIX + "uri";
    public static final String CONFIG_EXCHANGENAME = PREFIX + "exchangename";
    public static final String CONFIG_QUEUENAME = PREFIX + "queuename";
    public static final String CONFIG_HEARTBEAT = PREFIX + "heartbeat";
    private static final int DEFAULT_HEARTBEAT = 60;
    private final String uri;
    private final String exchangeName;
    private final String queueName;
    private final int heartbeat;
    private final ByteBuffer buffer;
    private final Connection connection;
    private final Channel channel;
    private final WriterRegistry writerRegistry;

    public AmqpWriter(Configuration configuration) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        super(configuration);
        this.uri = configuration.getStringProperty(CONFIG_URI);
        this.exchangeName = configuration.getStringProperty(CONFIG_EXCHANGENAME);
        this.queueName = configuration.getStringProperty(CONFIG_QUEUENAME);
        int configuredHeartbeat = configuration.getIntProperty(CONFIG_HEARTBEAT);
        this.heartbeat = configuredHeartbeat == 0 ? 60 : configuredHeartbeat;
        int bufferSize = 16384;
        this.buffer = ByteBuffer.allocate(16384);
        this.connection = this.createConnection();
        this.channel = this.connection.createChannel();
        this.writerRegistry = new WriterRegistry(this);
    }

    private Connection createConnection() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.uri);
        connectionFactory.setRequestedHeartbeat(this.heartbeat);
        connectionFactory.setThreadFactory(new DaemonThreadFactory());
        return connectionFactory.newConnection();
    }

    @Override
    public void onStarting() {
    }

    @Override
    public void writeMonitoringRecord(IMonitoringRecord monitoringRecord) {
        ByteBuffer recordBuffer = this.buffer;
        int requiredBufferSize = 21 + monitoringRecord.getSize();
        if (recordBuffer.capacity() < requiredBufferSize) {
            throw new IllegalStateException("Insufficient buffer capacity for string registry data");
        }
        String recordClassName = monitoringRecord.getClass().getName();
        this.writerRegistry.register(recordClassName);
        recordBuffer.put((byte)1);
        recordBuffer.putLong(this.writerRegistry.getId());
        recordBuffer.putInt(this.writerRegistry.getId(recordClassName));
        recordBuffer.putLong(monitoringRecord.getLoggingTimestamp());
        monitoringRecord.serialize(BinaryValueSerializer.create(recordBuffer, this.writerRegistry));
        this.publishBuffer(recordBuffer);
    }

    @Override
    public void onNewRegistryEntry(String value, int id) {
        ByteBuffer registryBuffer = this.buffer;
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        int requiredBufferSize = 17 + bytes.length;
        if (registryBuffer.capacity() < requiredBufferSize) {
            throw new IllegalStateException("Insufficient buffer capacity for string registry data");
        }
        registryBuffer.put((byte)-1);
        registryBuffer.putLong(this.writerRegistry.getId());
        registryBuffer.putInt(id);
        registryBuffer.putInt(bytes.length);
        registryBuffer.put(bytes);
        this.publishBuffer(registryBuffer);
    }

    private void publishBuffer(ByteBuffer localBuffer) {
        int dataSize = localBuffer.position();
        byte[] data = new byte[dataSize];
        System.arraycopy(localBuffer.array(), localBuffer.arrayOffset(), data, 0, dataSize);
        localBuffer.position(0);
        try {
            this.channel.basicPublish(this.exchangeName, this.queueName, null, data);
        }
        catch (IOException e) {
            LOGGER.error("An exception occurred", e);
        }
    }

    @Override
    public void onTerminating() {
        try {
            this.connection.close();
        }
        catch (IOException e) {
            LOGGER.error("Error closing connection", e);
        }
    }
}

