/*
 * Decompiled with CFR 0.152.
 */
package kieker.monitoring.writer.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import kieker.common.configuration.Configuration;
import kieker.common.util.thread.DaemonThreadFactory;
import kieker.monitoring.writer.raw.IRawDataWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChunkingAmqpWriter
implements IRawDataWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChunkingAmqpWriter.class);
    private static final String PREFIX = ChunkingAmqpWriter.class.getName() + ".";
    public static final String CONFIG_URI = PREFIX + "uri";
    public static final String CONFIG_EXCHANGENAME = PREFIX + "exchangename";
    public static final String CONFIG_QUEUENAME = PREFIX + "queuename";
    public static final String CONFIG_HEARTBEAT = PREFIX + "heartbeat";
    private static final int DEFAULT_HEARTBEAT = 60;
    private final String uri;
    private final String exchangeName;
    private final String queueName;
    private final int heartbeat;
    private final Connection connection;
    private final Channel channel;

    public ChunkingAmqpWriter(Configuration configuration) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        this.uri = configuration.getStringProperty(CONFIG_URI);
        this.exchangeName = configuration.getStringProperty(CONFIG_EXCHANGENAME);
        this.queueName = configuration.getStringProperty(CONFIG_QUEUENAME);
        int configuredHeartbeat = configuration.getIntProperty(CONFIG_HEARTBEAT);
        this.heartbeat = configuredHeartbeat == 0 ? 60 : configuredHeartbeat;
        this.connection = this.createConnection();
        this.channel = this.connection.createChannel();
    }

    private Connection createConnection() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.uri);
        connectionFactory.setRequestedHeartbeat(this.heartbeat);
        connectionFactory.setThreadFactory((ThreadFactory)new DaemonThreadFactory());
        return connectionFactory.newConnection();
    }

    @Override
    public void onInitialization() {
    }

    @Override
    public void onTermination() {
        try {
            this.connection.close();
        }
        catch (IOException e) {
            LOGGER.error("Error closing connection", e);
        }
    }

    @Override
    public void writeData(ByteBuffer buffer, int offset, int length) {
        buffer.position(offset);
        byte[] rawData = new byte[length];
        buffer.get(rawData);
        try {
            this.channel.basicPublish(this.exchangeName, this.queueName, null, rawData);
        }
        catch (IOException e) {
            LOGGER.error("An exception occurred publishing the data.", e);
        }
    }
}

