/*
 * Decompiled with CFR 0.152.
 */
package kieker.analysis.plugin.reader.tcp;

import java.nio.ByteBuffer;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.reader.AbstractReaderPlugin;
import kieker.analysis.plugin.reader.tcp.AbstractRecordTcpReader;
import kieker.common.configuration.Configuration;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.factory.CachedRecordFactoryCatalog;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.registry.reader.ReaderRegistry;
import kieker.monitoring.core.controller.tcp.AbstractTcpReader;
import org.slf4j.Logger;

@Plugin(description="A reader which reads records from a TCP port", outputPorts={@OutputPort(name="monitoringRecords", eventTypes={IMonitoringRecord.class}, description="Output Port of the DualSocketTcpReader")}, configuration={@Property(name="port1", defaultValue="10133", description="The first port of the server used for the TCP connection."), @Property(name="port2", defaultValue="10134", description="The second port of the server used for the TCP connection.")})
public class DualSocketTcpReader
extends AbstractReaderPlugin {
    public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
    public static final String CONFIG_PROPERTY_NAME_PORT1 = "port1";
    public static final String CONFIG_PROPERTY_NAME_PORT2 = "port2";
    private static final int MESSAGE_BUFFER_SIZE = 65535;
    private final int port1;
    private final int port2;
    private final ReaderRegistry<String> stringRegistry = new ReaderRegistry();
    private final AbstractRecordTcpReader tcpMonitoringRecordReader;
    private final AbstractTcpReader tcpStringRecordReader;
    private Thread tcpStringRecordReaderThread;

    public DualSocketTcpReader(Configuration configuration, IProjectContext projectContext) {
        super(configuration, projectContext);
        this.port1 = this.configuration.getIntProperty(CONFIG_PROPERTY_NAME_PORT1);
        this.port2 = this.configuration.getIntProperty(CONFIG_PROPERTY_NAME_PORT2);
        this.tcpMonitoringRecordReader = this.createTcpMonitoringRecordReader(this.port1, 65535, this.logger, this.stringRegistry);
        this.tcpStringRecordReader = new AbstractTcpReader(this.port2, 65535, this.logger){

            @Override
            protected boolean onBufferReceived(ByteBuffer buffer) {
                RegistryRecord.registerRecordInRegistry(buffer, DualSocketTcpReader.this.stringRegistry);
                return true;
            }
        };
    }

    protected AbstractRecordTcpReader createTcpMonitoringRecordReader(int port, int bufferCapacity, Logger logger, ReaderRegistry<String> registry) {
        return new AbstractRecordTcpReader(port, bufferCapacity, logger, registry, new CachedRecordFactoryCatalog()){

            @Override
            protected void onRecordReceived(IMonitoringRecord record) {
                boolean success = DualSocketTcpReader.this.deliver(DualSocketTcpReader.OUTPUT_PORT_NAME_RECORDS, record);
                if (!success) {
                    this.logger.warn("Failed to deliver record: {}", (Object)record);
                }
            }
        };
    }

    @Override
    public boolean init() {
        this.tcpStringRecordReaderThread = new Thread(this.tcpStringRecordReader);
        this.tcpStringRecordReaderThread.start();
        return super.init();
    }

    @Override
    public Configuration getCurrentConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setProperty(CONFIG_PROPERTY_NAME_PORT1, Integer.toString(this.port1));
        configuration.setProperty(CONFIG_PROPERTY_NAME_PORT2, Integer.toString(this.port2));
        return configuration;
    }

    @Override
    public boolean read() {
        this.tcpMonitoringRecordReader.run();
        return true;
    }

    @Override
    public void terminate(boolean error) {
        this.logger.info("Shutdown requested.");
        this.tcpMonitoringRecordReader.terminate();
        this.tcpStringRecordReader.terminate();
        this.tcpStringRecordReaderThread.interrupt();
    }

    public int getPort1() {
        return this.port1;
    }

    public int getPort2() {
        return this.port2;
    }
}

