/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.broker;

import io.moquette.broker.AutoFlushHandler;
import io.moquette.broker.BugSnagErrorsHandler;
import io.moquette.broker.ISslContextCreator;
import io.moquette.broker.MoquetteIdleTimeoutHandler;
import io.moquette.broker.NewNettyMQTTHandler;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.metrics.BytesMetrics;
import io.moquette.broker.metrics.BytesMetricsCollector;
import io.moquette.broker.metrics.BytesMetricsHandler;
import io.moquette.broker.metrics.DropWizardMetricsHandler;
import io.moquette.broker.metrics.MQTTMessageLogger;
import io.moquette.broker.metrics.MessageMetrics;
import io.moquette.broker.metrics.MessageMetricsCollector;
import io.moquette.broker.metrics.MessageMetricsHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NewNettyAcceptor {
    private static final String MQTT_SUBPROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1";
    public static final String PLAIN_MQTT_PROTO = "TCP MQTT";
    public static final String SSL_MQTT_PROTO = "SSL MQTT";
    private static final Logger LOG = LoggerFactory.getLogger(NewNettyAcceptor.class);
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private final Map<String, Integer> ports = new HashMap<String, Integer>();
    private BytesMetricsCollector bytesMetricsCollector = new BytesMetricsCollector();
    private MessageMetricsCollector metricsCollector = new MessageMetricsCollector();
    private Optional<? extends ChannelInboundHandler> metrics;
    private Optional<? extends ChannelInboundHandler> errorsCather;
    private int nettySoBacklog;
    private boolean nettySoReuseaddr;
    private boolean nettyTcpNodelay;
    private boolean nettySoKeepalive;
    private int nettyChannelTimeoutSeconds;
    private int maxBytesInMessage;
    private Class<? extends ServerSocketChannel> channelClass;

    NewNettyAcceptor() {
    }

    public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslContextCreator sslCtxCreator) {
        LOG.debug("Initializing Netty acceptor");
        this.nettySoBacklog = props.intProp("netty.so_backlog", 128);
        this.nettySoReuseaddr = props.boolProp("netty.so_reuseaddr", true);
        this.nettyTcpNodelay = props.boolProp("netty.tcp_nodelay", true);
        this.nettySoKeepalive = props.boolProp("netty.so_keepalive", true);
        this.nettyChannelTimeoutSeconds = props.intProp("netty.channel_timeout.seconds", 10);
        this.maxBytesInMessage = props.intProp("netty.mqtt.message_size", 8092);
        boolean epoll = props.boolProp("netty.epoll", false);
        if (epoll) {
            LOG.info("Netty is using Epoll");
            this.bossGroup = new EpollEventLoopGroup();
            this.workerGroup = new EpollEventLoopGroup();
            this.channelClass = EpollServerSocketChannel.class;
        } else {
            LOG.info("Netty is using NIO");
            this.bossGroup = new NioEventLoopGroup();
            this.workerGroup = new NioEventLoopGroup();
            this.channelClass = NioServerSocketChannel.class;
        }
        boolean useFineMetrics = props.boolProp("use_metrics", false);
        if (useFineMetrics) {
            DropWizardMetricsHandler metricsHandler = new DropWizardMetricsHandler();
            metricsHandler.init(props);
            this.metrics = Optional.of(metricsHandler);
        } else {
            this.metrics = Optional.empty();
        }
        boolean useBugSnag = props.boolProp("use_bugsnag", false);
        if (useBugSnag) {
            BugSnagErrorsHandler bugSnagHandler = new BugSnagErrorsHandler();
            bugSnagHandler.init(props);
            this.errorsCather = Optional.of(bugSnagHandler);
        } else {
            this.errorsCather = Optional.empty();
        }
        this.initializePlainTCPTransport(mqttHandler, props);
        this.initializeWebSocketTransport(mqttHandler, props);
        if (this.securityPortsConfigured(props)) {
            SslContext sslContext = sslCtxCreator.initSSLContext();
            if (sslContext == null) {
                LOG.error("Can't initialize SSLHandler layer! Exiting, check your configuration of jks");
                return;
            }
            this.initializeSSLTCPTransport(mqttHandler, props, sslContext);
            this.initializeWSSTransport(mqttHandler, props, sslContext);
        }
    }

    private boolean securityPortsConfigured(IConfig props) {
        String sslTcpPortProp = props.getProperty("ssl_port");
        String wssPortProp = props.getProperty("secure_websocket_port");
        return sslTcpPortProp != null || wssPortProp != null;
    }

    private void initFactory(String host, int port, String protocol, final PipelineInitializer pipelieInitializer) {
        LOG.debug("Initializing integration. Protocol={}", (Object)protocol);
        ServerBootstrap b = new ServerBootstrap();
        ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)b.group(this.bossGroup, this.workerGroup).channel(this.channelClass)).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) throws Exception {
                pipelieInitializer.init(ch);
            }
        }).option(ChannelOption.SO_BACKLOG, (Object)this.nettySoBacklog)).option(ChannelOption.SO_REUSEADDR, (Object)this.nettySoReuseaddr)).childOption(ChannelOption.TCP_NODELAY, (Object)this.nettyTcpNodelay).childOption(ChannelOption.SO_KEEPALIVE, (Object)this.nettySoKeepalive);
        try {
            LOG.debug("Binding integration. host={}, port={}", (Object)host, (Object)port);
            ChannelFuture f = b.bind(host, port);
            LOG.info("Server bound to host={}, port={}, protocol={}", new Object[]{host, port, protocol});
            f.sync().addListener((GenericFutureListener)new LocalPortReaderFutureListener(protocol)).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        }
        catch (Exception ex) {
            if (ex instanceof BindException) {
                LOG.error("Cannot bind to port: " + port, (Throwable)ex);
                throw new RuntimeException("Cannot bind to port: " + port, ex);
            }
            LOG.error("An interruptedException was caught while initializing integration. Protocol={}", (Object)protocol, (Object)ex);
            throw new RuntimeException(ex);
        }
    }

    public int getPort() {
        return this.ports.computeIfAbsent(PLAIN_MQTT_PROTO, i -> 0);
    }

    public int getSslPort() {
        return this.ports.computeIfAbsent(SSL_MQTT_PROTO, i -> 0);
    }

    private void initializePlainTCPTransport(final NewNettyMQTTHandler handler, IConfig props) {
        LOG.debug("Configuring TCP MQTT transport");
        final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
        String host = props.getProperty("host");
        String tcpPortProp = props.getProperty("port", "disabled");
        if ("disabled".equals(tcpPortProp)) {
            LOG.info("Property {} has been set to {}. TCP MQTT will be disabled", (Object)"port", (Object)"disabled");
            return;
        }
        int port = Integer.parseInt(tcpPortProp);
        this.initFactory(host, port, PLAIN_MQTT_PROTO, new PipelineInitializer(){

            @Override
            void init(SocketChannel channel) {
                ChannelPipeline pipeline = channel.pipeline();
                NewNettyAcceptor.this.configureMQTTPipeline(pipeline, timeoutHandler, handler);
            }
        });
    }

    private void configureMQTTPipeline(ChannelPipeline pipeline, MoquetteIdleTimeoutHandler timeoutHandler, NewNettyMQTTHandler handler) {
        pipeline.addFirst("idleStateHandler", (ChannelHandler)new IdleStateHandler(this.nettyChannelTimeoutSeconds, 0, 0));
        pipeline.addAfter("idleStateHandler", "idleEventHandler", (ChannelHandler)timeoutHandler);
        if (this.errorsCather.isPresent()) {
            pipeline.addLast("bugsnagCatcher", (ChannelHandler)this.errorsCather.get());
        }
        pipeline.addFirst("bytemetrics", (ChannelHandler)new BytesMetricsHandler(this.bytesMetricsCollector));
        pipeline.addLast("autoflush", (ChannelHandler)new AutoFlushHandler(1L, TimeUnit.SECONDS));
        pipeline.addLast("decoder", (ChannelHandler)new MqttDecoder(this.maxBytesInMessage));
        pipeline.addLast("encoder", (ChannelHandler)MqttEncoder.INSTANCE);
        pipeline.addLast("metrics", (ChannelHandler)new MessageMetricsHandler(this.metricsCollector));
        pipeline.addLast("messageLogger", (ChannelHandler)new MQTTMessageLogger());
        if (this.metrics.isPresent()) {
            pipeline.addLast("wizardMetrics", (ChannelHandler)this.metrics.get());
        }
        pipeline.addLast("handler", (ChannelHandler)handler);
    }

    private void initializeWebSocketTransport(final NewNettyMQTTHandler handler, IConfig props) {
        LOG.debug("Configuring Websocket MQTT transport");
        String webSocketPortProp = props.getProperty("websocket_port", "disabled");
        if ("disabled".equals(webSocketPortProp)) {
            LOG.info("Property {} has been setted to {}. Websocket MQTT will be disabled", (Object)"websocket_port", (Object)"disabled");
            return;
        }
        int port = Integer.parseInt(webSocketPortProp);
        final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
        String host = props.getProperty("host");
        final String path = props.getProperty("websocket_path", "/mqtt");
        final int maxFrameSize = props.intProp("websocket_max_frame_size", 65536);
        this.initFactory(host, port, "Websocket MQTT", new PipelineInitializer(){

            @Override
            void init(SocketChannel channel) {
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast(new ChannelHandler[]{new HttpServerCodec()});
                pipeline.addLast("aggregator", (ChannelHandler)new HttpObjectAggregator(65536));
                pipeline.addLast("webSocketHandler", (ChannelHandler)new WebSocketServerProtocolHandler(path, NewNettyAcceptor.MQTT_SUBPROTOCOL_CSV_LIST, false, maxFrameSize));
                pipeline.addLast("ws2bytebufDecoder", (ChannelHandler)new WebSocketFrameToByteBufDecoder());
                pipeline.addLast("bytebuf2wsEncoder", (ChannelHandler)new ByteBufToWebSocketFrameEncoder());
                NewNettyAcceptor.this.configureMQTTPipeline(pipeline, timeoutHandler, handler);
            }
        });
    }

    private void initializeSSLTCPTransport(final NewNettyMQTTHandler handler, IConfig props, final SslContext sslContext) {
        LOG.debug("Configuring SSL MQTT transport");
        String sslPortProp = props.getProperty("ssl_port", "disabled");
        if ("disabled".equals(sslPortProp)) {
            LOG.info("Property {} has been set to {}. SSL MQTT will be disabled", (Object)"ssl_port", (Object)"disabled");
            return;
        }
        int sslPort = Integer.parseInt(sslPortProp);
        LOG.debug("Starting SSL on port {}", (Object)sslPort);
        final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
        String host = props.getProperty("host");
        String sNeedsClientAuth = props.getProperty("need_client_auth", "false");
        final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth);
        this.initFactory(host, sslPort, SSL_MQTT_PROTO, new PipelineInitializer(){

            @Override
            void init(SocketChannel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast("ssl", NewNettyAcceptor.this.createSslHandler(channel, sslContext, needsClientAuth));
                NewNettyAcceptor.this.configureMQTTPipeline(pipeline, timeoutHandler, handler);
            }
        });
    }

    private void initializeWSSTransport(final NewNettyMQTTHandler handler, IConfig props, final SslContext sslContext) {
        LOG.debug("Configuring secure websocket MQTT transport");
        String sslPortProp = props.getProperty("secure_websocket_port", "disabled");
        if ("disabled".equals(sslPortProp)) {
            LOG.info("Property {} has been set to {}. Secure websocket MQTT will be disabled", (Object)"secure_websocket_port", (Object)"disabled");
            return;
        }
        int sslPort = Integer.parseInt(sslPortProp);
        final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
        String host = props.getProperty("host");
        final String path = props.getProperty("websocket_path", "/mqtt");
        final int maxFrameSize = props.intProp("websocket_max_frame_size", 65536);
        String sNeedsClientAuth = props.getProperty("need_client_auth", "false");
        final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth);
        this.initFactory(host, sslPort, "Secure websocket", new PipelineInitializer(){

            @Override
            void init(SocketChannel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast("ssl", NewNettyAcceptor.this.createSslHandler(channel, sslContext, needsClientAuth));
                pipeline.addLast("httpEncoder", (ChannelHandler)new HttpResponseEncoder());
                pipeline.addLast("httpDecoder", (ChannelHandler)new HttpRequestDecoder());
                pipeline.addLast("aggregator", (ChannelHandler)new HttpObjectAggregator(65536));
                pipeline.addLast("webSocketHandler", (ChannelHandler)new WebSocketServerProtocolHandler(path, NewNettyAcceptor.MQTT_SUBPROTOCOL_CSV_LIST, false, maxFrameSize));
                pipeline.addLast("ws2bytebufDecoder", (ChannelHandler)new WebSocketFrameToByteBufDecoder());
                pipeline.addLast("bytebuf2wsEncoder", (ChannelHandler)new ByteBufToWebSocketFrameEncoder());
                NewNettyAcceptor.this.configureMQTTPipeline(pipeline, timeoutHandler, handler);
            }
        });
    }

    public void close() {
        LOG.debug("Closing Netty acceptor...");
        if (this.workerGroup == null || this.bossGroup == null) {
            LOG.error("Netty acceptor is not initialized");
            throw new IllegalStateException("Invoked close on an Acceptor that wasn't initialized");
        }
        Future workerWaiter = this.workerGroup.shutdownGracefully();
        Future bossWaiter = this.bossGroup.shutdownGracefully();
        LOG.info("Waiting for worker and boss event loop groups to terminate...");
        try {
            workerWaiter.await(10L, TimeUnit.SECONDS);
            bossWaiter.await(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException iex) {
            LOG.warn("An InterruptedException was caught while waiting for event loops to terminate...");
        }
        if (!this.workerGroup.isTerminated()) {
            LOG.warn("Forcing shutdown of worker event loop...");
            this.workerGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS);
        }
        if (!this.bossGroup.isTerminated()) {
            LOG.warn("Forcing shutdown of boss event loop...");
            this.bossGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS);
        }
        MessageMetrics metrics = this.metricsCollector.computeMetrics();
        BytesMetrics bytesMetrics = this.bytesMetricsCollector.computeMetrics();
        LOG.info("Metrics messages[read={}, write={}] bytes[read={}, write={}]", new Object[]{metrics.messagesRead(), metrics.messagesWrote(), bytesMetrics.readBytes(), bytesMetrics.wroteBytes()});
    }

    private ChannelHandler createSslHandler(SocketChannel channel, SslContext sslContext, boolean needsClientAuth) {
        SSLEngine sslEngine = sslContext.newEngine(channel.alloc(), channel.remoteAddress().getHostString(), channel.remoteAddress().getPort());
        sslEngine.setUseClientMode(false);
        if (needsClientAuth) {
            sslEngine.setNeedClientAuth(true);
        }
        return new SslHandler(sslEngine);
    }

    private class LocalPortReaderFutureListener
    implements ChannelFutureListener {
        private String transportName;

        LocalPortReaderFutureListener(String transportName) {
            this.transportName = transportName;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            SocketAddress localAddress;
            if (future.isSuccess() && (localAddress = future.channel().localAddress()) instanceof InetSocketAddress) {
                InetSocketAddress inetAddress = (InetSocketAddress)localAddress;
                LOG.debug("bound {} port: {}", (Object)this.transportName, (Object)inetAddress.getPort());
                int port = inetAddress.getPort();
                NewNettyAcceptor.this.ports.put(this.transportName, port);
            }
        }
    }

    private static abstract class PipelineInitializer {
        private PipelineInitializer() {
        }

        abstract void init(SocketChannel var1) throws Exception;
    }

    static class ByteBufToWebSocketFrameEncoder
    extends MessageToMessageEncoder<ByteBuf> {
        ByteBufToWebSocketFrameEncoder() {
        }

        protected void encode(ChannelHandlerContext chc, ByteBuf bb, List<Object> out) throws Exception {
            BinaryWebSocketFrame result = new BinaryWebSocketFrame();
            result.content().writeBytes(bb);
            out.add(result);
        }
    }

    static class WebSocketFrameToByteBufDecoder
    extends MessageToMessageDecoder<BinaryWebSocketFrame> {
        WebSocketFrameToByteBufDecoder() {
        }

        protected void decode(ChannelHandlerContext chc, BinaryWebSocketFrame frame, List<Object> out) throws Exception {
            ByteBuf bb = frame.content();
            bb.retain();
            out.add(bb);
        }
    }
}

