/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Cache;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandler;
import org.apache.pulsar.shade.io.netty.channel.ChannelInitializer;
import org.apache.pulsar.shade.io.netty.channel.socket.SocketChannel;
import org.apache.pulsar.shade.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.pulsar.shade.io.netty.handler.flow.FlowControlHandler;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslContext;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslHandler;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.NettyServerSslContextBuilder;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarChannelInitializer
extends ChannelInitializer<SocketChannel> {
    private static final Logger log = LoggerFactory.getLogger(PulsarChannelInitializer.class);
    public static final String TLS_HANDLER = "tls";
    private final PulsarService pulsar;
    private final String listenerName;
    private final boolean enableTls;
    private final boolean tlsEnabledWithKeyStore;
    private SslContextAutoRefreshBuilder<SslContext> sslCtxRefresher;
    private final ServiceConfiguration brokerConf;
    private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
    private final Cache<SocketAddress, ServerCnx> connections = Caffeine.newBuilder().weakKeys().weakValues().build();
    public static final Factory DEFAULT_FACTORY = PulsarChannelInitializer::new;

    public PulsarChannelInitializer(PulsarService pulsar, PulsarChannelOptions opts) throws Exception {
        this.pulsar = pulsar;
        this.listenerName = opts.getListenerName();
        this.enableTls = opts.isEnableTLS();
        ServiceConfiguration serviceConfig = pulsar.getConfiguration();
        this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
        if (this.enableTls) {
            if (this.tlsEnabledWithKeyStore) {
                this.nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(serviceConfig.getTlsProvider(), serviceConfig.getTlsKeyStoreType(), serviceConfig.getTlsKeyStore(), serviceConfig.getTlsKeyStorePassword(), serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustStoreType(), serviceConfig.getTlsTrustStore(), serviceConfig.getTlsTrustStorePassword(), serviceConfig.isTlsRequireTrustedClientCertOnConnect(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), serviceConfig.getTlsCertRefreshCheckDurationSec());
            } else {
                this.sslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), serviceConfig.isTlsRequireTrustedClientCertOnConnect(), serviceConfig.getTlsCertRefreshCheckDurationSec());
            }
        } else {
            this.sslCtxRefresher = null;
        }
        this.brokerConf = pulsar.getConfiguration();
        pulsar.getExecutor().scheduleAtFixedRate(SafeRunnable.safeRun(this::refreshAuthenticationCredentials), pulsar.getConfig().getAuthenticationRefreshCheckSeconds(), pulsar.getConfig().getAuthenticationRefreshCheckSeconds(), TimeUnit.SECONDS);
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        if (this.enableTls) {
            if (this.tlsEnabledWithKeyStore) {
                ch.pipeline().addLast(TLS_HANDLER, (ChannelHandler)new SslHandler(((KeyStoreSSLContext)this.nettySSLContextAutoRefreshBuilder.get()).createSSLEngine()));
            } else {
                ch.pipeline().addLast(TLS_HANDLER, (ChannelHandler)this.sslCtxRefresher.get().newHandler(ch.alloc()));
            }
            ch.pipeline().addLast("ByteBufPairEncoder", (ChannelHandler)ByteBufPair.COPYING_ENCODER);
        } else {
            ch.pipeline().addLast("ByteBufPairEncoder", (ChannelHandler)ByteBufPair.ENCODER);
        }
        if (this.pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
            ch.pipeline().addLast("optional-proxy-protocol-decoder", (ChannelHandler)new OptionalProxyProtocolDecoder());
        }
        ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(this.brokerConf.getMaxMessageSize() + 10240, 0, 4, 0, 4));
        ch.pipeline().addLast("flowController", (ChannelHandler)new FlowControlHandler());
        ServerCnx cnx = this.newServerCnx(this.pulsar, this.listenerName);
        ch.pipeline().addLast("handler", (ChannelHandler)cnx);
        this.connections.put(ch.remoteAddress(), cnx);
    }

    private void refreshAuthenticationCredentials() {
        this.connections.asMap().values().forEach(cnx -> {
            try {
                cnx.refreshAuthenticationCredentials();
            }
            catch (Throwable t) {
                log.warn("[{}] Failed to refresh auth credentials", (Object)cnx.clientAddress());
            }
        });
    }

    @VisibleForTesting
    protected ServerCnx newServerCnx(PulsarService pulsar, String listenerName) throws Exception {
        return new ServerCnx(pulsar, listenerName);
    }

    public static class PulsarChannelOptions {
        private boolean enableTLS;
        private String listenerName;

        PulsarChannelOptions(boolean enableTLS, String listenerName) {
            this.enableTLS = enableTLS;
            this.listenerName = listenerName;
        }

        public static PulsarChannelOptionsBuilder builder() {
            return new PulsarChannelOptionsBuilder();
        }

        public boolean isEnableTLS() {
            return this.enableTLS;
        }

        public String getListenerName() {
            return this.listenerName;
        }

        public void setEnableTLS(boolean enableTLS) {
            this.enableTLS = enableTLS;
        }

        public void setListenerName(String listenerName) {
            this.listenerName = listenerName;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PulsarChannelOptions)) {
                return false;
            }
            PulsarChannelOptions other = (PulsarChannelOptions)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.isEnableTLS() != other.isEnableTLS()) {
                return false;
            }
            String this$listenerName = this.getListenerName();
            String other$listenerName = other.getListenerName();
            return !(this$listenerName == null ? other$listenerName != null : !this$listenerName.equals(other$listenerName));
        }

        protected boolean canEqual(Object other) {
            return other instanceof PulsarChannelOptions;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + (this.isEnableTLS() ? 79 : 97);
            String $listenerName = this.getListenerName();
            result = result * 59 + ($listenerName == null ? 43 : $listenerName.hashCode());
            return result;
        }

        public String toString() {
            return "PulsarChannelInitializer.PulsarChannelOptions(enableTLS=" + this.isEnableTLS() + ", listenerName=" + this.getListenerName() + ")";
        }

        public static class PulsarChannelOptionsBuilder {
            private boolean enableTLS;
            private String listenerName;

            PulsarChannelOptionsBuilder() {
            }

            public PulsarChannelOptionsBuilder enableTLS(boolean enableTLS) {
                this.enableTLS = enableTLS;
                return this;
            }

            public PulsarChannelOptionsBuilder listenerName(String listenerName) {
                this.listenerName = listenerName;
                return this;
            }

            public PulsarChannelOptions build() {
                return new PulsarChannelOptions(this.enableTLS, this.listenerName);
            }

            public String toString() {
                return "PulsarChannelInitializer.PulsarChannelOptions.PulsarChannelOptionsBuilder(enableTLS=" + this.enableTLS + ", listenerName=" + this.listenerName + ")";
            }
        }
    }

    public static interface Factory {
        public PulsarChannelInitializer newPulsarChannelInitializer(PulsarService var1, PulsarChannelOptions var2) throws Exception;
    }
}

