/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;

public class NioEchoServer
extends Thread {
    private final int port;
    private final ServerSocketChannel serverSocketChannel;
    private final List<SocketChannel> newChannels;
    private final List<SocketChannel> socketChannels;
    private final AcceptorThread acceptorThread;
    private final Selector selector;
    private volatile WritableByteChannel outputChannel;
    private final CredentialCache credentialCache;

    public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config, String serverHost, ChannelBuilder channelBuilder) throws Exception {
        super("echoserver");
        this.setDaemon(true);
        this.serverSocketChannel = ServerSocketChannel.open();
        this.serverSocketChannel.configureBlocking(false);
        this.serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
        this.port = this.serverSocketChannel.socket().getLocalPort();
        this.socketChannels = Collections.synchronizedList(new ArrayList());
        this.newChannels = Collections.synchronizedList(new ArrayList());
        this.credentialCache = new CredentialCache();
        if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
            ScramCredentialUtils.createCache((CredentialCache)this.credentialCache, (Collection)ScramMechanism.mechanismNames());
        }
        if (channelBuilder == null) {
            channelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (SecurityProtocol)securityProtocol, (AbstractConfig)config, (CredentialCache)this.credentialCache);
        }
        this.selector = new Selector(5000L, new Metrics(), (Time)new MockTime(), "MetricGroup", channelBuilder);
        this.acceptorThread = new AcceptorThread();
    }

    public int port() {
        return this.port;
    }

    public CredentialCache credentialCache() {
        return this.credentialCache;
    }

    @Override
    public void run() {
        try {
            this.acceptorThread.start();
            while (this.serverSocketChannel.isOpen()) {
                this.selector.poll(1000L);
                for (SocketChannel socketChannel : this.newChannels) {
                    String id = this.id(socketChannel);
                    this.selector.register(id, socketChannel);
                    this.socketChannels.add(socketChannel);
                }
                this.newChannels.clear();
                List completedReceives = this.selector.completedReceives();
                for (NetworkReceive rcv : completedReceives) {
                    KafkaChannel channel = this.channel(rcv.source());
                    channel.mute();
                    NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
                    if (this.outputChannel == null) {
                        this.selector.send((Send)send);
                        continue;
                    }
                    for (ByteBuffer buffer : send.buffers) {
                        this.outputChannel.write(buffer);
                    }
                    channel.unmute();
                }
                for (Send send : this.selector.completedSends()) {
                    this.selector.unmute(send.destination());
                }
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private String id(SocketChannel channel) {
        return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" + channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
    }

    private KafkaChannel channel(String id) {
        KafkaChannel channel = this.selector.channel(id);
        return channel == null ? this.selector.closingChannel(id) : channel;
    }

    public void outputChannel(WritableByteChannel channel) {
        this.outputChannel = channel;
    }

    public Selector selector() {
        return this.selector;
    }

    public void closeConnections() throws IOException {
        for (SocketChannel channel : this.socketChannels) {
            channel.close();
        }
        this.socketChannels.clear();
    }

    public void close() throws IOException, InterruptedException {
        this.serverSocketChannel.close();
        this.closeConnections();
        this.acceptorThread.interrupt();
        this.acceptorThread.join();
        this.interrupt();
        this.join();
    }

    private class AcceptorThread
    extends Thread {
        public AcceptorThread() throws IOException {
            this.setName("acceptor");
        }

        @Override
        public void run() {
            try {
                java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open();
                NioEchoServer.this.serverSocketChannel.register(acceptSelector, 16);
                while (NioEchoServer.this.serverSocketChannel.isOpen()) {
                    if (acceptSelector.select(1000L) <= 0) continue;
                    Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        if (key.isAcceptable()) {
                            SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();
                            socketChannel.configureBlocking(false);
                            NioEchoServer.this.newChannels.add(socketChannel);
                            NioEchoServer.this.selector.wakeup();
                        }
                        it.remove();
                    }
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

