/*
 * Decompiled with CFR 0.152.
 */
package kieker.analysis.generic.source.tcp;

import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import kieker.analysis.generic.source.rewriter.ITraceMetadataRewriter;
import kieker.analysis.generic.source.tcp.Connection;
import kieker.common.exception.RecordInstantiationException;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.factory.CachedRecordFactoryCatalog;
import kieker.common.record.factory.IRecordFactory;
import org.slf4j.Logger;
import teetime.framework.OutputPort;

final class ReaderThread
extends Thread {
    private static final int INT_BYTES = 4;
    private static final int LONG_BYTES = 8;
    private static final Charset ENCODING = StandardCharsets.UTF_8;
    private final CachedRecordFactoryCatalog recordFactories = CachedRecordFactoryCatalog.getInstance();
    private final Selector readSelector;
    private final Logger logger;
    private final ITraceMetadataRewriter recordRewriter;
    private final OutputPort<IMonitoringRecord> outputPort;
    private boolean active;

    public ReaderThread(Logger logger, Selector readSelector, ITraceMetadataRewriter recordRewriter, OutputPort<IMonitoringRecord> outputPort) {
        this.readSelector = readSelector;
        this.logger = logger;
        this.recordRewriter = recordRewriter;
        this.outputPort = outputPort;
    }

    @Override
    public void run() {
        this.active = true;
        while (this.isAlive() && this.active) {
            try {
                int readReady = this.readSelector.select();
                if (readReady > 0) {
                    Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys();
                    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                    while (keyIterator.hasNext()) {
                        SelectionKey key = keyIterator.next();
                        this.readFromSocket(key);
                        keyIterator.remove();
                    }
                    selectedKeys.clear();
                    continue;
                }
                Thread.sleep(100L);
            }
            catch (ClosedSelectorException e1) {
                this.logger.error("Selector has already been closed.", (Throwable)e1);
            }
            catch (IOException e2) {
                this.logger.info("IO error while reading from connection.");
            }
            catch (InterruptedException e) {
                this.logger.warn("Thread.sleep was interrupted.");
            }
        }
    }

    private void readFromSocket(SelectionKey key) throws IOException {
        boolean endOfStreamReached = false;
        Connection connection = (Connection)key.attachment();
        SocketChannel socketChannel = connection.getChannel();
        int bytesRead = socketChannel.read(connection.getBuffer());
        while (bytesRead > 0) {
            bytesRead = socketChannel.read(connection.getBuffer());
        }
        if (bytesRead == -1) {
            endOfStreamReached = true;
        }
        this.processBuffer(connection);
        if (endOfStreamReached || connection.isError()) {
            this.logger.debug("Socket closed: " + socketChannel.getRemoteAddress().toString());
            key.attach(null);
            key.cancel();
            key.channel().close();
        }
    }

    private void processBuffer(Connection connection) throws IOException {
        ByteBuffer buffer = connection.getBuffer();
        buffer.flip();
        try {
            while (buffer.position() + 4 < buffer.limit()) {
                buffer.mark();
                if (this.onBufferReceived(connection)) continue;
                return;
            }
            buffer.mark();
            buffer.compact();
        }
        catch (BufferUnderflowException ex) {
            this.logger.warn("Unexpected buffer underflow. Resetting and compacting buffer.", (Throwable)ex);
            buffer.reset();
            buffer.compact();
        }
    }

    private boolean onBufferReceived(Connection connection) throws IOException {
        if (connection.getBuffer().remaining() < 4) {
            return false;
        }
        int clazzId = connection.getBuffer().getInt();
        if (clazzId == -1) {
            return this.registerRegistryEntry(connection);
        }
        return this.deserializeRecord(connection, clazzId);
    }

    private boolean registerRegistryEntry(Connection connection) {
        if (connection.getBuffer().remaining() < 8) {
            connection.getBuffer().reset();
            connection.getBuffer().compact();
            return false;
        }
        int id = connection.getBuffer().getInt();
        int stringLength = connection.getBuffer().getInt();
        if (connection.getBuffer().remaining() < stringLength) {
            connection.getBuffer().reset();
            connection.getBuffer().compact();
            return false;
        }
        byte[] strBytes = new byte[stringLength];
        connection.getBuffer().get(strBytes);
        String string = new String(strBytes, ENCODING);
        connection.getRegistry().register(id, string);
        return true;
    }

    private boolean deserializeRecord(Connection connection, int clazzId) throws IOException {
        String recordClassName = connection.getRegistry().get(clazzId);
        if (connection.getBuffer().remaining() < 8) {
            connection.getBuffer().reset();
            connection.getBuffer().compact();
            return false;
        }
        long loggingTimestamp = connection.getBuffer().getLong();
        IRecordFactory<? extends IMonitoringRecord> recordFactory = this.recordFactories.get(recordClassName);
        if (recordFactory == null) {
            this.logger.debug("Unknown class {}: No factory present. Aborting...", (Object)recordClassName);
            connection.setError(true);
            return false;
        }
        if (connection.getBuffer().remaining() < recordFactory.getRecordSizeInBytes()) {
            connection.getBuffer().reset();
            connection.getBuffer().compact();
            return false;
        }
        try {
            IMonitoringRecord record = recordFactory.create(connection.getValueDeserializer());
            this.recordRewriter.rewrite(connection, record, loggingTimestamp, this.outputPort);
            return true;
        }
        catch (RecordInstantiationException ex) {
            this.logger.error("Failed to create: " + recordClassName, (Throwable)ex);
            connection.getBuffer().reset();
            connection.getBuffer().compact();
            return false;
        }
    }

    public void terminate() {
        this.active = false;
        this.readSelector.wakeup();
    }
}

