/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.proxy;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.streamnative.pulsar.handlers.kop.EndPoint;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KopVersion;
import io.streamnative.pulsar.handlers.kop.proxy.ConnectionFactory;
import io.streamnative.pulsar.handlers.kop.proxy.KafkaProxyChannelInitializer;
import io.streamnative.pulsar.handlers.kop.proxy.KafkaProxyConfiguration;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.pulsar.proxy.extensions.ProxyExtension;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProxyExtension
implements ProxyExtension {
    private static final Logger log = LoggerFactory.getLogger(KafkaProxyExtension.class);
    public static final String ACTIVE_CONNECTIONS_METRICS = "ksn_proxy_active_connections";
    public static final String NEW_CONNECTIONS_METRICS = "ksn_proxy_new_connections";
    public static final String OPS_COUNTER_METRICS = "ksn_proxy_binary_ops";
    public static final String BYTES_COUNTER_METRICS = "ksn_proxy_binary_bytes";
    static final Gauge ACTIVE_CONNECTIONS = (Gauge)Gauge.build((String)"ksn_proxy_active_connections", (String)"Number of connections currently active in the ksn proxy").create().register();
    static final Counter NEW_CONNECTIONS = (Counter)Counter.build((String)"ksn_proxy_new_connections", (String)"Counter of connections being opened in the ksn proxy").create().register();
    static final Counter OPS_COUNTER = (Counter)Counter.build((String)"ksn_proxy_binary_ops", (String)"Counter of ksn proxy operations").create().register();
    static final Counter BYTES_COUNTER = (Counter)Counter.build((String)"ksn_proxy_binary_bytes", (String)"Counter of ksn proxy bytes").create().register();
    private final Cache<TopicPartition, InetSocketAddress> leaderCache = Caffeine.newBuilder().maximumSize(10000L).expireAfterWrite(Duration.ofMinutes(5L)).build();
    private KafkaProxyConfiguration config;
    private ConnectionFactory connectionFactory;
    private KafkaServiceConfiguration sslConfig;
    private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializers;

    @VisibleForTesting
    Map<InetSocketAddress, ChannelInitializer<SocketChannel>> getChannelInitializers() {
        return this.channelInitializers;
    }

    public String extensionName() {
        return "kafka";
    }

    public boolean accept(String extension) {
        return this.extensionName().equals(extension);
    }

    public void initialize(ProxyConfiguration conf) {
        this.config = new KafkaProxyConfiguration(conf.getProperties());
        this.config.setBrokerProxyConnectTimeoutMs(conf.getBrokerProxyConnectTimeoutMs());
        this.sslConfig = (KafkaServiceConfiguration)ConfigurationUtils.create((Properties)conf.getProperties(), KafkaServiceConfiguration.class);
        log.info("KafkaProxyExtension loaded config: {}", (Object)this.config);
    }

    public void start(ProxyService service) {
        log.info("Starting KafkaProxyExtension, kop version is: '{}'", (Object)KopVersion.getVersion());
        log.info("Git Revision {}", (Object)KopVersion.getGitSha());
        log.info("Built by {} on {} at {}", new Object[]{KopVersion.getBuildUser(), KopVersion.getBuildHost(), KopVersion.getBuildTime()});
        this.connectionFactory = new ConnectionFactory(this.config, service.getWorkerGroup(), service.getDnsAddressResolverGroup());
    }

    public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
        Map listeners = EndPoint.parseListeners((String)this.config.getKafkaListeners());
        Map advertisedListeners = EndPoint.parseListeners((String)this.config.getKafkaAdvertisedListeners());
        if (!listeners.keySet().equals(advertisedListeners.keySet())) {
            throw new RuntimeException("Listener names do not match between kafkaListeners \"" + this.config.getKafkaListeners() + "\" and kafkaAdvertisedListeners \"" + this.config.getKafkaAdvertisedListeners() + "\"");
        }
        this.channelInitializers = listeners.entrySet().stream().collect(Collectors.toMap(e -> ((EndPoint)e.getValue()).getInetAddress(), e -> {
            EndPoint advertisedEndPoint = (EndPoint)advertisedListeners.get(e.getKey());
            SecurityProtocol securityProtocol = advertisedEndPoint.getSecurityProtocol();
            SslContextFactory.Server sslContextFactory = securityProtocol.equals((Object)SecurityProtocol.SSL) || securityProtocol.equals((Object)SecurityProtocol.SASL_SSL) ? SSLUtils.createSslContextFactory((KafkaServiceConfiguration)this.sslConfig) : null;
            return new KafkaProxyChannelInitializer(advertisedEndPoint, this.connectionFactory, sslContextFactory, this.leaderCache);
        }));
        return this.channelInitializers;
    }

    public void close() {
        log.info("KafkaProxyExtension closed.");
    }
}

