/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;

public class NioEchoServer
extends Thread {
    private final int port;
    private final ServerSocketChannel serverSocketChannel;
    private final List<SocketChannel> newChannels;
    private final List<SocketChannel> socketChannels;
    private final AcceptorThread acceptorThread;
    private final Selector selector;
    private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue();

    public NioEchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs, String serverHost) throws Exception {
        this.serverSocketChannel = ServerSocketChannel.open();
        this.serverSocketChannel.configureBlocking(false);
        this.serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
        this.port = this.serverSocketChannel.socket().getLocalPort();
        this.socketChannels = Collections.synchronizedList(new ArrayList());
        this.newChannels = Collections.synchronizedList(new ArrayList());
        ChannelBuilder channelBuilder = ChannelBuilders.create((SecurityProtocol)securityProtocol, (Mode)Mode.SERVER, (LoginType)LoginType.SERVER, configs, null, (boolean)true);
        this.selector = new Selector(5000L, new Metrics(), (Time)new MockTime(), "MetricGroup", channelBuilder);
        this.setName("echoserver");
        this.setDaemon(true);
        this.acceptorThread = new AcceptorThread();
    }

    public int port() {
        return this.port;
    }

    @Override
    public void run() {
        try {
            this.acceptorThread.start();
            while (this.serverSocketChannel.isOpen()) {
                NetworkSend send;
                this.selector.poll(1000L);
                for (SocketChannel socketChannel : this.newChannels) {
                    String id = this.id(socketChannel);
                    this.selector.register(id, socketChannel);
                    this.socketChannels.add(socketChannel);
                }
                this.newChannels.clear();
                while ((send = this.inflightSends.peek()) != null && !this.selector.channel(send.destination()).hasSend()) {
                    send = this.inflightSends.poll();
                    this.selector.send((Send)send);
                }
                List completedReceives = this.selector.completedReceives();
                for (NetworkReceive rcv : completedReceives) {
                    NetworkSend send2 = new NetworkSend(rcv.source(), new ByteBuffer[]{rcv.payload()});
                    if (!this.selector.channel(send2.destination()).hasSend()) {
                        this.selector.send((Send)send2);
                        continue;
                    }
                    this.inflightSends.add(send2);
                }
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private String id(SocketChannel channel) {
        return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" + channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
    }

    public void closeConnections() throws IOException {
        for (SocketChannel channel : this.socketChannels) {
            channel.close();
        }
        this.socketChannels.clear();
    }

    public void close() throws IOException, InterruptedException {
        this.serverSocketChannel.close();
        this.closeConnections();
        this.acceptorThread.interrupt();
        this.acceptorThread.join();
        this.interrupt();
        this.join();
    }

    private class AcceptorThread
    extends Thread {
        public AcceptorThread() throws IOException {
            this.setName("acceptor");
        }

        @Override
        public void run() {
            try {
                java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open();
                NioEchoServer.this.serverSocketChannel.register(acceptSelector, 16);
                while (NioEchoServer.this.serverSocketChannel.isOpen()) {
                    if (acceptSelector.select(1000L) <= 0) continue;
                    Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        if (key.isAcceptable()) {
                            SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();
                            socketChannel.configureBlocking(false);
                            NioEchoServer.this.newChannels.add(socketChannel);
                            NioEchoServer.this.selector.wakeup();
                        }
                        it.remove();
                    }
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

