/*
 * Decompiled with CFR 0.152.
 */
package kieker.monitoring.writer.tcp;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import kieker.common.configuration.Configuration;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.io.BinaryValueSerializer;
import kieker.common.record.io.IValueSerializer;
import kieker.common.registry.IRegistryListener;
import kieker.common.registry.writer.WriterRegistry;
import kieker.monitoring.writer.AbstractMonitoringWriter;
import kieker.monitoring.writer.WriterUtil;
import kieker.monitoring.writer.tcp.ConnectionTimeoutException;
import kieker.monitoring.writer.tcp.TimeoutCountdown;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleSocketTcpWriter
extends AbstractMonitoringWriter
implements IRegistryListener<String> {
    public static final String PREFIX = SingleSocketTcpWriter.class.getName() + ".";
    public static final String CONFIG_HOSTNAME = PREFIX + "hostname";
    public static final String CONFIG_PORT = PREFIX + "port";
    public static final String CONFIG_BUFFERSIZE = PREFIX + "bufferSize";
    public static final String CONFIG_FLUSH = PREFIX + "flush";
    public static final String CONFIG_CONN_TIMEOUT_IN_MS = PREFIX + "connectionTimeoutInMs";
    private static final Logger LOGGER = LoggerFactory.getLogger(SingleSocketTcpWriter.class);
    private final InetSocketAddress socketAddress;
    private final int connectionTimeoutInMs;
    private SocketChannel socketChannel;
    private final ByteBuffer buffer;
    private final ByteBuffer registryBuffer;
    private final boolean flush;
    private final IValueSerializer serializer;

    public SingleSocketTcpWriter(Configuration configuration) throws IOException {
        super(configuration);
        String hostname = configuration.getStringProperty(CONFIG_HOSTNAME);
        int port = configuration.getIntProperty(CONFIG_PORT);
        this.socketAddress = new InetSocketAddress(hostname, port);
        int configConnectionTimeoutInMs = configuration.getIntProperty(CONFIG_CONN_TIMEOUT_IN_MS, 1);
        this.connectionTimeoutInMs = configConnectionTimeoutInMs > 0 ? configConnectionTimeoutInMs : 1;
        int bufferSize = this.configuration.getIntProperty(CONFIG_BUFFERSIZE);
        this.buffer = ByteBuffer.allocateDirect(bufferSize);
        this.registryBuffer = ByteBuffer.allocateDirect(bufferSize);
        this.flush = configuration.getBooleanProperty(CONFIG_FLUSH);
        WriterRegistry writerRegistry = new WriterRegistry(this);
        this.serializer = BinaryValueSerializer.create(this.buffer, writerRegistry);
    }

    @Override
    public void onStarting() {
        long connectionTimeoutInNs = TimeUnit.MILLISECONDS.toNanos(this.connectionTimeoutInMs);
        TimeoutCountdown timeoutCountdown = new TimeoutCountdown(connectionTimeoutInNs);
        do {
            try {
                this.socketChannel = SocketChannel.open();
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
            this.tryConnect(timeoutCountdown);
        } while (!this.socketChannel.isConnected());
        String message = String.format("Successfully connected to %s.", this.socketAddress);
        LOGGER.info(message);
    }

    private void tryConnect(TimeoutCountdown timeoutCountdown) throws ConnectionTimeoutException {
        Socket socket = this.socketChannel.socket();
        long startTimestampInNs = System.nanoTime();
        if (this.connectOrTimeout(socket, timeoutCountdown.getRemainingTimeoutInMs())) {
            return;
        }
        long currentTimestampInNs = System.nanoTime();
        long elapsedTimeInNs = currentTimestampInNs - startTimestampInNs;
        timeoutCountdown.countdownNs(elapsedTimeInNs);
        if (timeoutCountdown.getRemainingTimeoutInMs() <= 0) {
            String message = String.format("Connection timeout of %d ms exceeded.", this.connectionTimeoutInMs);
            throw new ConnectionTimeoutException(message);
        }
    }

    private boolean connectOrTimeout(Socket socket, int timeoutInMs) {
        try {
            socket.connect(this.socketAddress, timeoutInMs);
            return true;
        }
        catch (ConnectException | SocketTimeoutException e) {
            return false;
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    @Override
    public void writeMonitoringRecord(IMonitoringRecord monitoringRecord) {
        ByteBuffer recordBuffer = this.buffer;
        if (12 + monitoringRecord.getSize() > recordBuffer.remaining()) {
            WriterUtil.flushBuffer(this.registryBuffer, this.socketChannel, LOGGER);
            WriterUtil.flushBuffer(recordBuffer, this.socketChannel, LOGGER);
        }
        String recordClassName = monitoringRecord.getClass().getName();
        this.serializer.putString(recordClassName);
        this.serializer.putLong(monitoringRecord.getLoggingTimestamp());
        monitoringRecord.serialize(this.serializer);
        if (this.flush) {
            WriterUtil.flushBuffer(this.registryBuffer, this.socketChannel, LOGGER);
            WriterUtil.flushBuffer(recordBuffer, this.socketChannel, LOGGER);
        }
    }

    @Override
    public void onNewRegistryEntry(String value, int id) {
        ByteBuffer localRegistryBuffer = this.registryBuffer;
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        int requiredBufferSize = 16 + bytes.length;
        if (localRegistryBuffer.remaining() < requiredBufferSize) {
            WriterUtil.flushBuffer(localRegistryBuffer, this.socketChannel, LOGGER);
        }
        localRegistryBuffer.putInt(-1);
        localRegistryBuffer.putInt(id);
        localRegistryBuffer.putInt(value.length());
        localRegistryBuffer.put(bytes);
    }

    @Override
    public void onTerminating() {
        WriterUtil.flushBuffer(this.registryBuffer, this.socketChannel, LOGGER);
        WriterUtil.flushBuffer(this.buffer, this.socketChannel, LOGGER);
        WriterUtil.close(this.socketChannel, LOGGER);
    }
}

