/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.streamnative.pulsar.handlers.kop.AdminManager;
import io.streamnative.pulsar.handlers.kop.EndPoint;
import io.streamnative.pulsar.handlers.kop.KafkaChannelInitializer;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KafkaTopicManagerSharedState;
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
import io.streamnative.pulsar.handlers.kop.KopEventManager;
import io.streamnative.pulsar.handlers.kop.KopVersion;
import io.streamnative.pulsar.handlers.kop.LookupClient;
import io.streamnative.pulsar.handlers.kop.NamespaceBundleOwnershipListenerImpl;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.SchemaRegistryManager;
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
import io.streamnative.pulsar.handlers.kop.TenantContextManager;
import io.streamnative.pulsar.handlers.kop.TopicOwnershipListener;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.http.HttpChannelInitializer;
import io.streamnative.pulsar.handlers.kop.migration.MigrationManager;
import io.streamnative.pulsar.handlers.kop.schemaregistry.SchemaRegistryChannelInitializer;
import io.streamnative.pulsar.handlers.kop.stats.PrometheusMetricsProvider;
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
import io.streamnative.pulsar.handlers.kop.storage.ReplicaManager;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProtocolHandler
implements ProtocolHandler,
TenantContextManager {
    private static final Logger log = LoggerFactory.getLogger(KafkaProtocolHandler.class);
    public static final String PROTOCOL_NAME = "kafka";
    public static final String TLS_HANDLER = "tls";
    private RequestStats requestStats;
    private PrometheusMetricsProvider statsProvider;
    private KopBrokerLookupManager kopBrokerLookupManager;
    @VisibleForTesting
    private AdminManager adminManager = null;
    private SystemTopicClient txnTopicClient;
    private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
    private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
    private LookupClient lookupClient;
    @VisibleForTesting
    private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializerMap;
    @VisibleForTesting
    protected SystemTopicClient offsetTopicClient;
    private KafkaServiceConfiguration kafkaConfig;
    private BrokerService brokerService;
    private KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
    private KopEventManager kopEventManager;
    private OrderedScheduler sendResponseScheduler;
    private NamespaceBundleOwnershipListenerImpl bundleListener;
    private SchemaRegistryManager schemaRegistryManager;
    private MigrationManager migrationManager;
    private ReplicaManager replicaManager;
    private final Map<String, GroupCoordinator> groupCoordinatorsByTenant = new ConcurrentHashMap<String, GroupCoordinator>();
    private final Map<String, TransactionCoordinator> transactionCoordinatorByTenant = new ConcurrentHashMap<String, TransactionCoordinator>();

    @Override
    public GroupCoordinator getGroupCoordinator(String tenant) {
        return this.groupCoordinatorsByTenant.computeIfAbsent(tenant, this::createAndBootGroupCoordinator);
    }

    @VisibleForTesting
    public Map<String, GroupCoordinator> getGroupCoordinators() {
        return this.groupCoordinatorsByTenant;
    }

    @Override
    public TransactionCoordinator getTransactionCoordinator(String tenant) {
        return this.transactionCoordinatorByTenant.computeIfAbsent(tenant, this::createAndBootTransactionCoordinator);
    }

    public ReplicaManager getReplicaManager() {
        return this.replicaManager;
    }

    public String protocolName() {
        return PROTOCOL_NAME;
    }

    public boolean accept(String protocol) {
        return PROTOCOL_NAME.equalsIgnoreCase(protocol);
    }

    public void initialize(ServiceConfiguration conf) throws Exception {
        if (conf instanceof KafkaServiceConfiguration) {
            this.kafkaConfig = (KafkaServiceConfiguration)conf;
        } else {
            this.kafkaConfig = (KafkaServiceConfiguration)((Object)ConfigurationUtils.create(conf.getProperties(), KafkaServiceConfiguration.class));
            this.kafkaConfig.setAdvertisedAddress(conf.getAdvertisedAddress());
            this.kafkaConfig.setBindAddress(conf.getBindAddress());
        }
        for (String fullNamespace : this.kafkaConfig.getKopAllowedNamespaces()) {
            String[] tokens = fullNamespace.split("/");
            if (tokens.length != 2) {
                throw new IllegalArgumentException("Invalid namespace '" + fullNamespace + "' in kopAllowedNamespaces config");
            }
            NamespaceName.validateNamespaceName((String)tokens[0].replace("${tenant}", this.kafkaConfig.getKafkaTenant()), (String)tokens[1].replace("*", this.kafkaConfig.getKafkaNamespace()));
        }
        this.statsProvider = new PrometheusMetricsProvider();
        StatsLogger rootStatsLogger = this.statsProvider.getStatsLogger("");
        this.requestStats = new RequestStats(rootStatsLogger.scope("kop_server"));
        this.sendResponseScheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().name("send-response").numThreads(this.kafkaConfig.getNumSendKafkaResponseThreads()).build();
    }

    public String getProtocolDataToAdvertise() {
        String result = this.kafkaConfig.getKafkaAdvertisedListeners();
        log.info("Advertised addresses for the 'kafka' endpoint: {}", (Object)result);
        return result;
    }

    public void start(BrokerService service) {
        log.info("Starting KafkaProtocolHandler, kop version is: '{}'", (Object)KopVersion.getVersion());
        log.info("Git Revision {}", (Object)KopVersion.getGitSha());
        log.info("Built by {} on {} at {}", new Object[]{KopVersion.getBuildUser(), KopVersion.getBuildHost(), KopVersion.getBuildTime()});
        this.brokerService = service;
        this.kafkaTopicManagerSharedState = new KafkaTopicManagerSharedState(this.brokerService);
        try {
            PulsarAdmin pulsarAdmin = this.brokerService.getPulsar().getAdminClient();
            this.adminManager = new AdminManager(pulsarAdmin, this.kafkaConfig);
        }
        catch (PulsarServerException e) {
            log.error("Failed to get pulsarAdmin", (Throwable)e);
            throw new IllegalStateException(e);
        }
        this.lookupClient = new LookupClient(this.brokerService.pulsar(), this.kafkaConfig);
        this.offsetTopicClient = new SystemTopicClient(this.brokerService.pulsar(), this.kafkaConfig);
        this.txnTopicClient = new SystemTopicClient(this.brokerService.pulsar(), this.kafkaConfig);
        try {
            this.kopBrokerLookupManager = new KopBrokerLookupManager(this.kafkaConfig, this.brokerService.getPulsar(), this.lookupClient);
        }
        catch (Exception ex) {
            log.error("Failed to get kopBrokerLookupManager", (Throwable)ex);
            throw new IllegalStateException(ex);
        }
        NamespaceService namespaceService = this.brokerService.pulsar().getNamespaceService();
        this.bundleListener = new NamespaceBundleOwnershipListenerImpl(namespaceService, this.brokerService.pulsar().getBrokerServiceUrl());
        this.bundleListener.addTopicOwnershipListener(new TopicOwnershipListener(){

            @Override
            public void whenLoad(TopicName topicName) {
                this.invalidateBundleCache(topicName);
            }

            @Override
            public void whenUnload(TopicName topicName) {
                this.invalidateBundleCache(topicName);
                this.invalidatePartitionLog(topicName);
            }

            @Override
            public String name() {
                return "CacheInvalidator";
            }

            private void invalidateBundleCache(TopicName topicName) {
                KafkaProtocolHandler.this.kafkaTopicManagerSharedState.deReference(topicName.toString());
                if (!topicName.isPartitioned()) {
                    String nonPartitionedTopicName = topicName.getPartition(0).toString();
                    KafkaProtocolHandler.this.kafkaTopicManagerSharedState.deReference(nonPartitionedTopicName);
                }
            }

            private void invalidatePartitionLog(TopicName topicName) {
                KafkaProtocolHandler.this.getReplicaManager().removePartitionLog(topicName.toString());
                if (!topicName.isPartitioned()) {
                    KafkaProtocolHandler.this.getReplicaManager().removePartitionLog(topicName.getPartition(0).toString());
                }
            }
        });
        namespaceService.addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener[]{this.bundleListener});
        if (this.kafkaConfig.isKafkaManageSystemNamespaces()) {
            this.getGroupCoordinator(this.kafkaConfig.getKafkaMetadataTenant());
        }
        this.kopEventManager = new KopEventManager(this.adminManager, (MetadataStore)this.brokerService.getPulsar().getLocalMetadataStore(), this.requestStats.getStatsLogger(), this.kafkaConfig, this.groupCoordinatorsByTenant);
        this.kopEventManager.start();
        if (this.kafkaConfig.isKafkaTransactionCoordinatorEnabled() && this.kafkaConfig.isKafkaManageSystemNamespaces()) {
            this.getTransactionCoordinator(this.kafkaConfig.getKafkaMetadataTenant());
        }
        PropertiesConfiguration conf = new PropertiesConfiguration();
        conf.addProperty("prometheusStatsLatencyRolloverSeconds", (Object)this.kafkaConfig.getKopPrometheusStatsLatencyRolloverSeconds());
        conf.addProperty("cluster", (Object)this.kafkaConfig.getClusterName());
        this.statsProvider.start((Configuration)conf);
        this.brokerService.pulsar().addPrometheusRawMetricsProvider((PrometheusRawMetricsProvider)this.statsProvider);
        this.schemaRegistryManager = new SchemaRegistryManager(this.kafkaConfig, this.brokerService.getPulsar(), this.brokerService.getAuthenticationService());
        this.migrationManager = new MigrationManager(this.kafkaConfig, this.brokerService.getPulsar());
    }

    private TransactionCoordinator createAndBootTransactionCoordinator(String tenant) {
        log.info("createAndBootTransactionCoordinator {}", (Object)tenant);
        ClusterData clusterData = ClusterData.builder().serviceUrl(this.brokerService.getPulsar().getWebServiceAddress()).serviceUrlTls(this.brokerService.getPulsar().getWebServiceAddressTls()).brokerServiceUrl(this.brokerService.getPulsar().getBrokerServiceUrl()).brokerServiceUrlTls(this.brokerService.getPulsar().getBrokerServiceUrlTls()).build();
        try {
            final TransactionCoordinator transactionCoordinator = this.initTransactionCoordinator(tenant, this.brokerService.getPulsar().getAdminClient(), clusterData);
            final NamespaceName kafkaMetaNs = NamespaceName.get((String)tenant, (String)this.kafkaConfig.getKafkaMetadataNamespace());
            final String metadataNamespace = this.kafkaConfig.getKafkaMetadataNamespace();
            this.bundleListener.addTopicOwnershipListener(new TopicOwnershipListener(){

                @Override
                public void whenLoad(TopicName topicName) {
                    if (KopTopic.isTransactionMetadataTopicName(topicName.toString(), metadataNamespace)) {
                        transactionCoordinator.handleTxnImmigration(topicName.getPartitionIndex());
                    }
                }

                @Override
                public void whenUnload(TopicName topicName) {
                    if (KopTopic.isTransactionMetadataTopicName(topicName.toString(), metadataNamespace)) {
                        transactionCoordinator.handleTxnEmigration(topicName.getPartitionIndex());
                    }
                }

                @Override
                public String name() {
                    return "TransactionStateRecover-" + transactionCoordinator.getTopicPartitionName();
                }

                @Override
                public boolean test(NamespaceName namespaceName) {
                    return namespaceName.equals((Object)kafkaMetaNs);
                }
            });
            return transactionCoordinator;
        }
        catch (Exception e) {
            log.error("Initialized transaction coordinator failed.", (Throwable)e);
            throw new IllegalStateException(e);
        }
    }

    private GroupCoordinator createAndBootGroupCoordinator(String tenant) {
        GroupCoordinator groupCoordinator;
        log.info("createAndBootGroupCoordinator {}", (Object)tenant);
        ClusterData clusterData = ClusterData.builder().serviceUrl(this.brokerService.getPulsar().getWebServiceAddress()).serviceUrlTls(this.brokerService.getPulsar().getWebServiceAddressTls()).brokerServiceUrl(this.brokerService.getPulsar().getBrokerServiceUrl()).brokerServiceUrlTls(this.brokerService.getPulsar().getBrokerServiceUrlTls()).build();
        try {
            MetadataUtils.createOffsetMetadataIfMissing(tenant, this.brokerService.getPulsar().getAdminClient(), clusterData, this.kafkaConfig);
            groupCoordinator = this.startGroupCoordinator(tenant, this.offsetTopicClient);
            final NamespaceName kafkaMetaNs = NamespaceName.get((String)tenant, (String)this.kafkaConfig.getKafkaMetadataNamespace());
            final String metadataNamespace = this.kafkaConfig.getKafkaMetadataNamespace();
            this.bundleListener.addTopicOwnershipListener(new TopicOwnershipListener(){

                @Override
                public void whenLoad(TopicName topicName) {
                    if (KopTopic.isGroupMetadataTopicName(topicName.toString(), metadataNamespace)) {
                        groupCoordinator.handleGroupImmigration(topicName.getPartitionIndex());
                    }
                }

                @Override
                public void whenUnload(TopicName topicName) {
                    if (KopTopic.isGroupMetadataTopicName(topicName.toString(), metadataNamespace)) {
                        groupCoordinator.handleGroupEmigration(topicName.getPartitionIndex());
                    }
                }

                @Override
                public String name() {
                    return "OffsetAndTopicListener-" + groupCoordinator.getGroupManager().getTopicPartitionName();
                }

                @Override
                public boolean test(NamespaceName namespaceName) {
                    return namespaceName.equals((Object)kafkaMetaNs);
                }
            });
        }
        catch (Exception e) {
            log.error("Failed to create offset metadata", (Throwable)e);
            throw new IllegalStateException(e);
        }
        try {
            MetadataUtils.createKafkaNamespaceIfMissing(this.brokerService.getPulsar().getAdminClient(), clusterData, this.kafkaConfig);
        }
        catch (Exception e) {
            log.warn("init kafka failed, need to create it manually later", (Throwable)e);
        }
        return groupCoordinator;
    }

    private KafkaChannelInitializer newKafkaChannelInitializer(EndPoint endPoint) {
        return new KafkaChannelInitializer(this.brokerService.getPulsar(), this.kafkaConfig, this, this.replicaManager, this.kopBrokerLookupManager, this.adminManager, this.producePurgatory, this.fetchPurgatory, endPoint.isTlsEnabled(), endPoint, this.kafkaConfig.isSkipMessagesWithoutIndex(), this.requestStats, this.sendResponseScheduler, this.kafkaTopicManagerSharedState, this.lookupClient);
    }

    public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
        Preconditions.checkState((this.kafkaConfig != null ? 1 : 0) != 0);
        Preconditions.checkState((this.brokerService != null ? 1 : 0) != 0);
        this.producePurgatory = DelayedOperationPurgatory.builder().purgatoryName("produce").timeoutTimer(SystemTimer.builder().executorName("produce").build()).build();
        this.fetchPurgatory = DelayedOperationPurgatory.builder().purgatoryName("fetch").timeoutTimer(SystemTimer.builder().executorName("fetch").build()).build();
        this.replicaManager = new ReplicaManager(this.kafkaConfig, this.requestStats, Time.SYSTEM, (ImmutableMap<String, EntryFilterWithClassLoader>)this.brokerService.getEntryFilters(), this.producePurgatory, this.fetchPurgatory);
        try {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            EndPoint.parseListeners(this.kafkaConfig.getListeners(), this.kafkaConfig.getKafkaProtocolMap()).forEach((listener, endPoint) -> builder.put((Object)endPoint.getInetAddress(), (Object)this.newKafkaChannelInitializer((EndPoint)endPoint)));
            Optional<HttpChannelInitializer> migrationChannelInitializer = this.migrationManager.build();
            migrationChannelInitializer.ifPresent(initializer -> builder.put((Object)this.migrationManager.getAddress(), (Object)initializer));
            Optional<SchemaRegistryChannelInitializer> schemaRegistryChannelInitializer = this.schemaRegistryManager.build();
            schemaRegistryChannelInitializer.ifPresent(registryChannelInitializer -> builder.put((Object)this.schemaRegistryManager.getAddress(), registryChannelInitializer));
            this.channelInitializerMap = builder.build();
            return this.channelInitializerMap;
        }
        catch (Exception e) {
            log.error("KafkaProtocolHandler newChannelInitializers failed with ", (Throwable)e);
            return null;
        }
    }

    public void close() {
        if (this.producePurgatory != null) {
            this.producePurgatory.shutdown();
        }
        if (this.fetchPurgatory != null) {
            this.fetchPurgatory.shutdown();
        }
        this.groupCoordinatorsByTenant.values().forEach(GroupCoordinator::shutdown);
        this.kopEventManager.close();
        if (this.schemaRegistryManager != null) {
            this.schemaRegistryManager.close();
        }
        this.transactionCoordinatorByTenant.values().forEach(TransactionCoordinator::shutdown);
        KopBrokerLookupManager.clear();
        this.kafkaTopicManagerSharedState.close();
        this.kopBrokerLookupManager.close();
        this.statsProvider.stop();
        this.sendResponseScheduler.shutdown();
        if (this.offsetTopicClient != null) {
            this.offsetTopicClient.close();
        }
        if (this.txnTopicClient != null) {
            this.txnTopicClient.close();
        }
        if (this.adminManager != null) {
            this.adminManager.shutdown();
        }
        if (this.lookupClient != null) {
            this.lookupClient.close();
        }
    }

    @VisibleForTesting
    protected GroupCoordinator startGroupCoordinator(String tenant, SystemTopicClient client) {
        int offsetTopicNumPartitions;
        GroupConfig groupConfig = new GroupConfig(this.kafkaConfig.getGroupMinSessionTimeoutMs(), this.kafkaConfig.getGroupMaxSessionTimeoutMs(), this.kafkaConfig.getGroupInitialRebalanceDelayMs());
        String topicName = tenant + "/" + this.kafkaConfig.getKafkaMetadataNamespace() + "/__consumer_offsets";
        try {
            PulsarAdmin pulsarAdmin = this.brokerService.getPulsar().getAdminClient();
            offsetTopicNumPartitions = pulsarAdmin.topics().getPartitionedTopicMetadata((String)topicName).partitions;
            if (offsetTopicNumPartitions == 0) {
                log.error("Offset topic should not be a non-partitioned topic.");
                throw new IllegalStateException("Offset topic should not be a non-partitioned topic.");
            }
        }
        catch (PulsarServerException | PulsarAdminException e) {
            log.error("Failed to get offset topic partition metadata .", e);
            throw new IllegalStateException(e);
        }
        String namespacePrefixForMetadata = MetadataUtils.constructMetadataNamespace(tenant, this.kafkaConfig);
        OffsetConfig offsetConfig = OffsetConfig.builder().offsetsTopicName(topicName).offsetsTopicNumPartitions(offsetTopicNumPartitions).offsetsTopicCompressionType(CompressionType.valueOf((String)this.kafkaConfig.getOffsetsTopicCompressionCodec())).maxMetadataSize(this.kafkaConfig.getOffsetMetadataMaxSize()).offsetsRetentionCheckIntervalMs(this.kafkaConfig.getOffsetsRetentionCheckIntervalMs()).offsetsRetentionMs(TimeUnit.MINUTES.toMillis(this.kafkaConfig.getOffsetsRetentionMinutes())).build();
        GroupCoordinator groupCoordinator = GroupCoordinator.of(tenant, client, groupConfig, offsetConfig, namespacePrefixForMetadata, SystemTimer.builder().executorName("group-coordinator-timer").build(), Time.SYSTEM);
        groupCoordinator.startup(true);
        return groupCoordinator;
    }

    public TransactionCoordinator initTransactionCoordinator(String tenant, PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception {
        TransactionConfig transactionConfig = TransactionConfig.builder().transactionLogNumPartitions(this.kafkaConfig.getKafkaTxnLogTopicNumPartitions()).transactionMetadataTopicName(MetadataUtils.constructTxnLogTopicBaseName(tenant, this.kafkaConfig)).transactionProducerIdTopicName(MetadataUtils.constructTxnProducerIdTopicBaseName(tenant, this.kafkaConfig)).abortTimedOutTransactionsIntervalMs(this.kafkaConfig.getKafkaTxnAbortTimedOutTransactionCleanupIntervalMs()).transactionalIdExpirationMs(this.kafkaConfig.getKafkaTransactionalIdExpirationMs()).removeExpiredTransactionalIdsIntervalMs(this.kafkaConfig.getKafkaTransactionsRemoveExpiredTransactionalIdCleanupIntervalMs()).brokerId(this.kafkaConfig.getKafkaBrokerId()).build();
        MetadataUtils.createTxnMetadataIfMissing(tenant, pulsarAdmin, clusterData, this.kafkaConfig);
        TransactionCoordinator transactionCoordinator = TransactionCoordinator.of(tenant, this.kafkaConfig, transactionConfig, this.txnTopicClient, this.brokerService.getPulsar().getLocalMetadataStore(), this.kopBrokerLookupManager, (ScheduledExecutorService)OrderedScheduler.newSchedulerBuilder().name("transaction-log-manager-" + tenant).numThreads(1).build(), Time.SYSTEM);
        transactionCoordinator.startup(this.kafkaConfig.isKafkaTransactionalIdExpirationEnable()).get();
        return transactionCoordinator;
    }

    public RequestStats getRequestStats() {
        return this.requestStats;
    }

    public KopBrokerLookupManager getKopBrokerLookupManager() {
        return this.kopBrokerLookupManager;
    }

    public AdminManager getAdminManager() {
        return this.adminManager;
    }

    public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> getChannelInitializerMap() {
        return this.channelInitializerMap;
    }

    public SystemTopicClient getOffsetTopicClient() {
        return this.offsetTopicClient;
    }

    public KafkaServiceConfiguration getKafkaConfig() {
        return this.kafkaConfig;
    }

    public KopEventManager getKopEventManager() {
        return this.kopEventManager;
    }
}

