/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.listen.dispatcher;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.security.util.ClientAuth;

public class SocketChannelDispatcher<E extends Event<SocketChannel>>
implements AsyncChannelDispatcher {
    private final EventFactory<E> eventFactory;
    private final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory;
    private final ByteBufferSource bufferSource;
    private final BlockingQueue<E> events;
    private final ComponentLog logger;
    private final int maxConnections;
    private final int maxThreadPoolSize;
    private final SSLContext sslContext;
    private final ClientAuth clientAuth;
    private final Charset charset;
    private ThreadPoolExecutor executor;
    private volatile boolean stopped = false;
    private Selector selector;
    private final BlockingQueue<SelectionKey> keyQueue;
    private final AtomicInteger currentConnections = new AtomicInteger(0);

    public SocketChannelDispatcher(EventFactory<E> eventFactory, ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, ByteBufferSource bufferSource, BlockingQueue<E> events, ComponentLog logger, int maxConnections, SSLContext sslContext, Charset charset) {
        this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset);
    }

    public SocketChannelDispatcher(EventFactory<E> eventFactory, ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, ByteBufferSource bufferSource, BlockingQueue<E> events, ComponentLog logger, int maxConnections, SSLContext sslContext, ClientAuth clientAuth, Charset charset) {
        this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, maxConnections, sslContext, clientAuth, charset);
    }

    public SocketChannelDispatcher(EventFactory<E> eventFactory, ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, ByteBufferSource bufferSource, BlockingQueue<E> events, ComponentLog logger, int maxConnections, int maxThreadPoolSize, SSLContext sslContext, ClientAuth clientAuth, Charset charset) {
        this.eventFactory = eventFactory;
        this.handlerFactory = handlerFactory;
        this.bufferSource = bufferSource;
        this.events = events;
        this.logger = logger;
        this.maxConnections = maxConnections;
        this.maxThreadPoolSize = maxThreadPoolSize;
        this.keyQueue = new LinkedBlockingQueue<SelectionKey>(maxConnections);
        this.sslContext = sslContext;
        this.clientAuth = clientAuth;
        this.charset = charset;
    }

    @Override
    public void open(InetAddress nicAddress, int port, int maxBufferSize) throws IOException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(nicAddress, port);
        this.stopped = false;
        this.executor = new ThreadPoolExecutor(this.maxThreadPoolSize, this.maxThreadPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + "-worker-%d").build());
        this.executor.allowCoreThreadTimeOut(true);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        if (maxBufferSize > 0) {
            serverSocketChannel.setOption((SocketOption)StandardSocketOptions.SO_RCVBUF, (Object)maxBufferSize);
            int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
            if (actualReceiveBufSize < maxBufferSize) {
                this.logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's maximum receive buffer");
            }
        }
        serverSocketChannel.socket().bind(inetSocketAddress);
        this.selector = Selector.open();
        serverSocketChannel.register(this.selector, 16);
    }

    @Override
    public void run() {
        while (!this.stopped) {
            try {
                SelectionKey key;
                int selected = this.selector.select();
                if (selected > 0 && !this.stopped) {
                    Iterator<SelectionKey> selectorKeys = this.selector.selectedKeys().iterator();
                    while (selectorKeys.hasNext() && !this.stopped) {
                        SelectionKey key2 = selectorKeys.next();
                        selectorKeys.remove();
                        if (!key2.isValid()) continue;
                        if (key2.isAcceptable()) {
                            ServerSocketChannel channel = (ServerSocketChannel)key2.channel();
                            SocketChannel socketChannel = channel.accept();
                            socketChannel.setOption((SocketOption)StandardSocketOptions.SO_KEEPALIVE, (Object)true);
                            if (this.currentConnections.incrementAndGet() > this.maxConnections) {
                                this.currentConnections.decrementAndGet();
                                this.logger.warn("Rejecting connection from {} because max connections has been met", new Object[]{socketChannel.getRemoteAddress().toString()});
                                IOUtils.closeQuietly((Closeable)socketChannel);
                                continue;
                            }
                            this.logger.debug("Accepted incoming connection from {}", new Object[]{socketChannel.getRemoteAddress().toString()});
                            socketChannel.configureBlocking(false);
                            SelectionKey readKey = socketChannel.register(this.selector, 1);
                            ByteBuffer buffer = this.bufferSource.acquire();
                            SSLSocketChannel sslSocketChannel = null;
                            if (this.sslContext != null) {
                                SSLEngine sslEngine = this.sslContext.createSSLEngine();
                                sslEngine.setUseClientMode(false);
                                switch (this.clientAuth) {
                                    case REQUIRED: {
                                        sslEngine.setNeedClientAuth(true);
                                        break;
                                    }
                                    case WANT: {
                                        sslEngine.setWantClientAuth(true);
                                        break;
                                    }
                                    case NONE: {
                                        sslEngine.setNeedClientAuth(false);
                                        sslEngine.setWantClientAuth(false);
                                    }
                                }
                                sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel);
                            }
                            SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslSocketChannel);
                            readKey.attach(attachment);
                            continue;
                        }
                        if (!key2.isReadable()) continue;
                        key2.interestOps(0);
                        ChannelHandler<E, AsyncChannelDispatcher> handler = this.sslContext != null ? this.handlerFactory.createSSLHandler(key2, this, this.charset, this.eventFactory, this.events, this.logger) : this.handlerFactory.createHandler(key2, this, this.charset, this.eventFactory, this.events, this.logger);
                        this.executor.execute(handler);
                    }
                }
                while ((key = (SelectionKey)this.keyQueue.poll()) != null) {
                    key.interestOps(1);
                }
            }
            catch (IOException e) {
                this.logger.error("Error accepting connection from SocketChannel", (Throwable)e);
            }
        }
    }

    @Override
    public int getPort() {
        for (SelectionKey key : this.selector.keys()) {
            SelectableChannel channel;
            if (!key.isValid() || !((channel = key.channel()) instanceof ServerSocketChannel)) continue;
            return ((ServerSocketChannel)channel).socket().getLocalPort();
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        this.stopped = true;
        if (this.selector != null) {
            this.selector.wakeup();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
                    this.executor.shutdownNow();
                }
            }
            catch (InterruptedException ie) {
                this.executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        if (this.selector != null) {
            Set<SelectionKey> set = this.selector.keys();
            synchronized (set) {
                for (SelectionKey key : this.selector.keys()) {
                    IOUtils.closeQuietly((Closeable)key.channel());
                }
            }
        }
        IOUtils.closeQuietly((Selector)this.selector);
    }

    @Override
    public void completeConnection(SelectionKey key) {
        SocketChannelAttachment attachment = (SocketChannelAttachment)key.attachment();
        this.bufferSource.release(attachment.getByteBuffer());
        this.currentConnections.decrementAndGet();
    }

    @Override
    public void addBackForSelection(SelectionKey key) {
        this.keyQueue.offer(key);
        this.selector.wakeup();
    }
}

