/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.streamnative.pulsar.handlers.kop.AdminManager;
import io.streamnative.pulsar.handlers.kop.EndPoint;
import io.streamnative.pulsar.handlers.kop.KafkaRequestHandler;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KafkaTopicManagerSharedState;
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
import io.streamnative.pulsar.handlers.kop.LookupClient;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.TenantContextManager;
import io.streamnative.pulsar.handlers.kop.storage.ReplicaManager;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarService;
import org.eclipse.jetty.util.ssl.SslContextFactory;

public class KafkaChannelInitializer
extends ChannelInitializer<SocketChannel> {
    public static final int MAX_FRAME_LENGTH = 0x6400000;
    private final PulsarService pulsarService;
    private final KafkaServiceConfiguration kafkaConfig;
    private final TenantContextManager tenantContextManager;
    private final ReplicaManager replicaManager;
    private final KopBrokerLookupManager kopBrokerLookupManager;
    private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
    private final LookupClient lookupClient;
    private final AdminManager adminManager;
    private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
    private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
    private final boolean enableTls;
    private final EndPoint advertisedEndPoint;
    private final boolean skipMessagesWithoutIndex;
    private final SslContextFactory.Server sslContextFactory;
    private final RequestStats requestStats;
    private final OrderedScheduler sendResponseScheduler;
    private final LengthFieldPrepender lengthFieldPrepender;

    public KafkaChannelInitializer(PulsarService pulsarService, KafkaServiceConfiguration kafkaConfig, TenantContextManager tenantContextManager, ReplicaManager replicaManager, KopBrokerLookupManager kopBrokerLookupManager, AdminManager adminManager, DelayedOperationPurgatory<DelayedOperation> producePurgatory, DelayedOperationPurgatory<DelayedOperation> fetchPurgatory, boolean enableTLS, EndPoint advertisedEndPoint, boolean skipMessagesWithoutIndex, RequestStats requestStats, OrderedScheduler sendResponseScheduler, KafkaTopicManagerSharedState kafkaTopicManagerSharedState, LookupClient lookupClient) {
        this.pulsarService = pulsarService;
        this.kafkaConfig = kafkaConfig;
        this.tenantContextManager = tenantContextManager;
        this.replicaManager = replicaManager;
        this.kopBrokerLookupManager = kopBrokerLookupManager;
        this.lookupClient = lookupClient;
        this.adminManager = adminManager;
        this.producePurgatory = producePurgatory;
        this.fetchPurgatory = fetchPurgatory;
        this.enableTls = enableTLS;
        this.advertisedEndPoint = advertisedEndPoint;
        this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
        this.requestStats = requestStats;
        this.sslContextFactory = this.enableTls ? SSLUtils.createSslContextFactory(kafkaConfig) : null;
        this.sendResponseScheduler = sendResponseScheduler;
        this.kafkaTopicManagerSharedState = kafkaTopicManagerSharedState;
        this.lengthFieldPrepender = new LengthFieldPrepender(4);
    }

    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast("idleStateHandler", (ChannelHandler)new IdleStateHandler(this.kafkaConfig.getConnectionMaxIdleMs(), this.kafkaConfig.getConnectionMaxIdleMs(), 0L, TimeUnit.MILLISECONDS));
        if (this.enableTls) {
            ch.pipeline().addLast("tls", (ChannelHandler)new SslHandler(SSLUtils.createSslEngine(this.sslContextFactory)));
        }
        ch.pipeline().addLast(new ChannelHandler[]{this.lengthFieldPrepender});
        ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(0x6400000, 0, 4, 0, 4));
        ch.pipeline().addLast("handler", (ChannelHandler)this.newCnx());
    }

    @VisibleForTesting
    public KafkaRequestHandler newCnx() throws Exception {
        return new KafkaRequestHandler(this.pulsarService, this.kafkaConfig, this.tenantContextManager, this.replicaManager, this.kopBrokerLookupManager, this.adminManager, this.producePurgatory, this.fetchPurgatory, this.enableTls, this.advertisedEndPoint, this.skipMessagesWithoutIndex, this.requestStats, this.sendResponseScheduler, this.kafkaTopicManagerSharedState, this.lookupClient);
    }

    @VisibleForTesting
    public KafkaRequestHandler newCnx(TenantContextManager tenantContextManager) throws Exception {
        return new KafkaRequestHandler(this.pulsarService, this.kafkaConfig, tenantContextManager, this.replicaManager, this.kopBrokerLookupManager, this.adminManager, this.producePurgatory, this.fetchPurgatory, this.enableTls, this.advertisedEndPoint, this.skipMessagesWithoutIndex, this.requestStats, this.sendResponseScheduler, this.kafkaTopicManagerSharedState, this.lookupClient);
    }

    public PulsarService getPulsarService() {
        return this.pulsarService;
    }

    public KafkaServiceConfiguration getKafkaConfig() {
        return this.kafkaConfig;
    }

    public TenantContextManager getTenantContextManager() {
        return this.tenantContextManager;
    }

    public KopBrokerLookupManager getKopBrokerLookupManager() {
        return this.kopBrokerLookupManager;
    }

    public KafkaTopicManagerSharedState getKafkaTopicManagerSharedState() {
        return this.kafkaTopicManagerSharedState;
    }

    public boolean isEnableTls() {
        return this.enableTls;
    }

    public EndPoint getAdvertisedEndPoint() {
        return this.advertisedEndPoint;
    }

    public SslContextFactory.Server getSslContextFactory() {
        return this.sslContextFactory;
    }

    public RequestStats getRequestStats() {
        return this.requestStats;
    }
}

