/*
 * Decompiled with CFR 0.152.
 */
package kieker.analysis.generic.source.tcp;

import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import kieker.analysis.generic.source.rewriter.ITraceMetadataRewriter;
import kieker.analysis.generic.source.tcp.Connection;
import kieker.analysis.generic.source.tcp.ReaderThread;
import kieker.common.record.IMonitoringRecord;
import teetime.framework.AbstractProducerStage;
import teetime.framework.OutputPort;

public class MultipleConnectionTcpSourceStage
extends AbstractProducerStage<IMonitoringRecord> {
    private final int inputPort;
    private final int bufferSize;
    private final ITraceMetadataRewriter recordRewriter;
    private ReaderThread reader;
    private boolean allowNewConnection;

    public MultipleConnectionTcpSourceStage(int inputPort, int bufferSize, ITraceMetadataRewriter recordRewriter) {
        this.inputPort = inputPort;
        this.bufferSize = bufferSize;
        this.recordRewriter = recordRewriter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void execute() {
        try {
            ServerSocketChannel serverSocket = ServerSocketChannel.open();
            serverSocket.bind(new InetSocketAddress(this.inputPort));
            serverSocket.configureBlocking(true);
            Selector readSelector = Selector.open();
            this.reader = new ReaderThread(this.logger, readSelector, this.recordRewriter, (OutputPort<IMonitoringRecord>)this.outputPort);
            this.reader.start();
            this.allowNewConnection = true;
            while (this.allowNewConnection && !this.shouldBeTerminated()) {
                SocketChannel socketChannel = serverSocket.accept();
                if (socketChannel == null) continue;
                this.logger.debug("Connection from {}.", (Object)socketChannel.getRemoteAddress().toString());
                socketChannel.configureBlocking(false);
                readSelector.wakeup();
                SelectionKey key = socketChannel.register(readSelector, 1);
                Connection connection = new Connection(socketChannel, this.bufferSize);
                key.attach(connection);
            }
            this.reader.terminate();
            this.reader.join();
        }
        catch (ClosedByInterruptException e) {
            this.logger.info("External shutdown called");
        }
        catch (BindException e) {
            this.logger.error("Cannot estabilsh listening port: Address {} is already in use.", (Object)this.inputPort);
        }
        catch (IOException e) {
            this.logger.error("Cannot establish listening port", (Throwable)e);
        }
        catch (InterruptedException e) {
            this.logger.error("Reader termination was interrupted.", (Throwable)e);
        }
        finally {
            this.workCompleted();
        }
    }

    public void rejectNewConnection() {
        this.allowNewConnection = false;
    }

    protected void onTerminating() {
        this.rejectNewConnection();
        if (this.reader.isAlive()) {
            this.reader.terminate();
            try {
                this.reader.join();
            }
            catch (InterruptedException e) {
                this.logger.warn("Reader termination was interrupted.");
            }
        }
        super.onTerminating();
    }
}

