/*
 * Decompiled with CFR 0.152.
 */
package kieker.analysis.plugin.reader.tcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.reader.AbstractReaderPlugin;
import kieker.analysis.plugin.reader.tcp.TCPStringReader;
import kieker.common.configuration.Configuration;
import kieker.common.exception.RecordInstantiationException;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.factory.CachedRecordFactoryCatalog;
import kieker.common.record.factory.IRecordFactory;
import kieker.common.record.io.BinaryValueDeserializer;
import kieker.common.registry.reader.ReaderRegistry;

@Deprecated
@Plugin(description="A reader which reads records from a TCP port", outputPorts={@OutputPort(name="monitoringRecords", eventTypes={IMonitoringRecord.class}, description="Output Port of the TCPReader")}, configuration={@Property(name="port1", defaultValue="10133", description="The first port of the server used for the TCP connection."), @Property(name="port2", defaultValue="10134", description="The second port of the server used for the TCP connection.")})
public final class TCPReader
extends AbstractReaderPlugin {
    public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
    public static final String CONFIG_PROPERTY_NAME_PORT1 = "port1";
    public static final String CONFIG_PROPERTY_NAME_PORT2 = "port2";
    private static final int MESSAGE_BUFFER_SIZE = 65535;
    private volatile Thread readerThread;
    private volatile TCPStringReader tcpStringReader;
    private volatile boolean terminated;
    private final int port1;
    private final int port2;
    private final ReaderRegistry<String> stringRegistry = new ReaderRegistry();
    private final CachedRecordFactoryCatalog cachedRecordFactoryCatalog = CachedRecordFactoryCatalog.getInstance();

    public TCPReader(Configuration configuration, IProjectContext projectContext) {
        super(configuration, projectContext);
        this.port1 = this.configuration.getIntProperty(CONFIG_PROPERTY_NAME_PORT1);
        this.port2 = this.configuration.getIntProperty(CONFIG_PROPERTY_NAME_PORT2);
    }

    @Override
    public boolean init() {
        this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry);
        this.tcpStringReader.start();
        return super.init();
    }

    @Override
    public Configuration getCurrentConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setProperty(CONFIG_PROPERTY_NAME_PORT1, Integer.toString(this.port1));
        configuration.setProperty(CONFIG_PROPERTY_NAME_PORT2, Integer.toString(this.port2));
        return configuration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean read() {
        this.readerThread = Thread.currentThread();
        ServerSocketChannel serversocket = null;
        try {
            serversocket = ServerSocketChannel.open();
            serversocket.socket().bind(new InetSocketAddress(this.port1));
            this.logger.debug("Listening on port {}", (Object)this.port1);
            SocketChannel socketChannel = serversocket.accept();
            ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
            while (socketChannel.read(buffer) != -1 && !this.terminated) {
                buffer.flip();
                try {
                    while (buffer.hasRemaining()) {
                        buffer.mark();
                        this.read(buffer);
                    }
                    buffer.clear();
                }
                catch (BufferUnderflowException ex) {
                    buffer.reset();
                    buffer.compact();
                }
            }
            socketChannel.close();
        }
        catch (ClosedByInterruptException ex) {
            this.logger.warn("Reader interrupted", (Throwable)ex);
            boolean bl = this.terminated;
            return bl;
        }
        catch (IOException ex) {
            this.logger.error("Error while reading", (Throwable)ex);
            boolean bl = false;
            return bl;
        }
        finally {
            if (null != serversocket) {
                this.close(serversocket);
            }
        }
        return true;
    }

    private void read(ByteBuffer buffer) {
        int clazzId = buffer.getInt();
        long loggingTimestamp = buffer.getLong();
        try {
            String recordClassName = this.stringRegistry.get(clazzId);
            IRecordFactory<? extends IMonitoringRecord> recordFactory = this.cachedRecordFactoryCatalog.get(recordClassName);
            IMonitoringRecord record = recordFactory.create(BinaryValueDeserializer.create(buffer, this.stringRegistry));
            record.setLoggingTimestamp(loggingTimestamp);
            super.deliver(OUTPUT_PORT_NAME_RECORDS, record);
        }
        catch (RecordInstantiationException ex) {
            this.logger.error("Failed to create record", (Throwable)ex);
        }
    }

    private void close(ServerSocketChannel serversocket) {
        try {
            serversocket.close();
        }
        catch (IOException e) {
            this.logger.debug("Failed to close TCP connection!", (Throwable)e);
        }
    }

    @Override
    public void terminate(boolean error) {
        this.logger.info("Shutdown of TCPReader requested.");
        this.terminated = true;
        this.readerThread.interrupt();
        this.tcpStringReader.terminate();
    }
}

