/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.streamnative.pulsar.handlers.kop.AdminManager;
import io.streamnative.pulsar.handlers.kop.ApiVersion;
import io.streamnative.pulsar.handlers.kop.EndPoint;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KafkaTopicManager;
import io.streamnative.pulsar.handlers.kop.KafkaTopicManagerSharedState;
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
import io.streamnative.pulsar.handlers.kop.KopEventManager;
import io.streamnative.pulsar.handlers.kop.ListPair;
import io.streamnative.pulsar.handlers.kop.LookupClient;
import io.streamnative.pulsar.handlers.kop.MessageFetchContext;
import io.streamnative.pulsar.handlers.kop.PendingTopicFutures;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.TenantContextManager;
import io.streamnative.pulsar.handlers.kop.TopicAndMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
import io.streamnative.pulsar.handlers.kop.security.Session;
import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer;
import io.streamnative.pulsar.handlers.kop.security.auth.Resource;
import io.streamnative.pulsar.handlers.kop.security.auth.ResourceType;
import io.streamnative.pulsar.handlers.kop.security.auth.SimpleAclAuthorizer;
import io.streamnative.pulsar.handlers.kop.storage.AppendRecordsContext;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import io.streamnative.pulsar.handlers.kop.storage.ReplicaManager;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.GroupIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.KafkaRequestUtils;
import io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageMetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder;
import io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
import org.apache.kafka.common.message.AlterConfigsResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListOffsetRequestV0;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.ResponseCallbackWrapper;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRequestHandler
extends KafkaCommandDecoder {
    private static final Logger log = LoggerFactory.getLogger(KafkaRequestHandler.class);
    private static final int THROTTLE_TIME_MS = 10;
    private static final String POLICY_ROOT = "/admin/policies/";
    private final PulsarService pulsarService;
    private final KafkaTopicManager topicManager;
    private final TenantContextManager tenantContextManager;
    private final ReplicaManager replicaManager;
    private final KopBrokerLookupManager kopBrokerLookupManager;
    private final LookupClient lookupClient;
    private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
    private final String clusterName;
    private final ScheduledExecutorService executor;
    private final PulsarAdmin admin;
    private final MetadataStoreExtended metadataStore;
    private final SaslAuthenticator authenticator;
    private final Authorizer authorizer;
    private final AdminManager adminManager;
    private final Boolean tlsEnabled;
    private final EndPoint advertisedEndPoint;
    private final boolean skipMessagesWithoutIndex;
    private final int defaultNumPartitions;
    public final int maxReadEntriesNum;
    private final int failedAuthenticationDelayMs;
    private final ConcurrentHashMap<String, CompletableFuture<String>> currentConnectedGroup;
    private final ConcurrentSkipListSet<String> currentConnectedClientId;
    private final String groupIdStoredPath;
    private final Set<String> groupIds = new HashSet<String>();
    private final Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap = new ConcurrentHashMap<TopicPartition, PendingTopicFutures>();
    private final DelayedOperationPurgatory<DelayedOperation> producePurgatory;
    private final DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
    private final long maxPendingBytes;
    private final long resumeThresholdPendingBytes;
    private final AtomicLong pendingBytes = new AtomicLong(0L);
    private volatile boolean autoReadDisabledPublishBufferLimiting = false;

    private String getCurrentTenant() {
        return this.getCurrentTenant(this.kafkaConfig.getKafkaMetadataTenant());
    }

    private String getCurrentTenant(String defaultTenant) {
        if (this.kafkaConfig.isKafkaEnableMultiTenantMetadata() && this.authenticator != null && this.authenticator.session() != null && this.authenticator.session().getPrincipal() != null && this.authenticator.session().getPrincipal().getTenantSpec() != null) {
            String tenantSpec = this.authenticator.session().getPrincipal().getTenantSpec();
            return KafkaRequestHandler.extractTenantFromTenantSpec(tenantSpec);
        }
        if (log.isDebugEnabled()) {
            log.debug("using {} as tenant", (Object)defaultTenant);
        }
        return defaultTenant;
    }

    public String currentNamespacePrefix() {
        String currentTenant = this.getCurrentTenant(this.kafkaConfig.getKafkaTenant());
        return MetadataUtils.constructUserTopicsNamespace(currentTenant, this.kafkaConfig);
    }

    private static String extractTenantFromTenantSpec(String tenantSpec) {
        if (tenantSpec != null && !tenantSpec.isEmpty()) {
            String tenant = tenantSpec;
            if (tenantSpec.contains("/")) {
                tenant = tenantSpec.substring(0, tenantSpec.indexOf(47));
            }
            if (log.isDebugEnabled()) {
                log.debug("using {} as tenant", (Object)tenant);
            }
            return tenant;
        }
        return tenantSpec;
    }

    public GroupCoordinator getGroupCoordinator() {
        return this.tenantContextManager.getGroupCoordinator(this.getCurrentTenant());
    }

    public TransactionCoordinator getTransactionCoordinator() {
        this.throwIfTransactionCoordinatorDisabled();
        return this.tenantContextManager.getTransactionCoordinator(this.getCurrentTenant());
    }

    public KafkaRequestHandler(PulsarService pulsarService, KafkaServiceConfiguration kafkaConfig, TenantContextManager tenantContextManager, ReplicaManager replicaManager, KopBrokerLookupManager kopBrokerLookupManager, AdminManager adminManager, DelayedOperationPurgatory<DelayedOperation> producePurgatory, DelayedOperationPurgatory<DelayedOperation> fetchPurgatory, Boolean tlsEnabled, EndPoint advertisedEndPoint, boolean skipMessagesWithoutIndex, RequestStats requestStats, OrderedScheduler sendResponseScheduler, KafkaTopicManagerSharedState kafkaTopicManagerSharedState, LookupClient lookupClient) throws Exception {
        super(requestStats, kafkaConfig, sendResponseScheduler);
        this.pulsarService = pulsarService;
        this.tenantContextManager = tenantContextManager;
        this.replicaManager = replicaManager;
        this.kopBrokerLookupManager = kopBrokerLookupManager;
        this.lookupClient = lookupClient;
        this.clusterName = kafkaConfig.getClusterName();
        this.executor = pulsarService.getExecutor();
        this.admin = pulsarService.getAdminClient();
        this.metadataStore = pulsarService.getLocalMetadataStore();
        boolean authenticationEnabled = pulsarService.getBrokerService().isAuthenticationEnabled() && !kafkaConfig.getSaslAllowedMechanisms().isEmpty();
        this.authenticator = authenticationEnabled ? new SaslAuthenticator(pulsarService, kafkaConfig.getSaslAllowedMechanisms(), kafkaConfig) : null;
        boolean authorizationEnabled = pulsarService.getBrokerService().isAuthorizationEnabled();
        this.authorizer = authorizationEnabled && authenticationEnabled ? new SimpleAclAuthorizer(pulsarService) : null;
        this.adminManager = adminManager;
        this.producePurgatory = producePurgatory;
        this.fetchPurgatory = fetchPurgatory;
        this.tlsEnabled = tlsEnabled;
        this.advertisedEndPoint = advertisedEndPoint;
        this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
        this.topicManager = new KafkaTopicManager(this);
        this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
        this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
        this.currentConnectedGroup = new ConcurrentHashMap();
        this.currentConnectedClientId = new ConcurrentSkipListSet();
        this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath();
        this.maxPendingBytes = (long)kafkaConfig.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L;
        this.resumeThresholdPendingBytes = this.maxPendingBytes / 2L;
        this.failedAuthenticationDelayMs = kafkaConfig.getFailedAuthenticationDelayMs();
        this.kafkaTopicManagerSharedState = kafkaTopicManagerSharedState;
        RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.incrementAndGet();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.topicManager.setRemoteAddress(ctx.channel().remoteAddress());
        if (this.authenticator != null) {
            this.authenticator.reset();
        }
        RequestStats.ACTIVE_CHANNEL_COUNT_INSTANCE.incrementAndGet();
        log.info("channel active: {}", (Object)ctx.channel());
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        RequestStats.ACTIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet();
        this.close();
    }

    @Override
    protected void close() {
        if (this.isActive.getAndSet(false)) {
            super.close();
            this.topicManager.close();
            String clientHost = this.ctx.channel().remoteAddress().toString();
            if (this.currentConnectedGroup.containsKey(clientHost)) {
                log.info("currentConnectedGroup remove {}", (Object)clientHost);
                this.currentConnectedGroup.remove(clientHost);
            }
            if (log.isDebugEnabled()) {
                log.debug("Try to remove all stored groupID on the metadata store. Current connected clientIds: {}", this.currentConnectedClientId);
            }
            if (this.kafkaConfig.isKopEnableGroupLevelConsumerMetrics()) {
                this.currentConnectedClientId.forEach(clientId -> {
                    String path = this.groupIdStoredPath + GroupIdUtils.groupIdPathFormat(clientHost, clientId);
                    this.metadataStore.delete(path, Optional.empty()).whenComplete((__, ex) -> {
                        if (ex != null) {
                            if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                                if (log.isDebugEnabled()) {
                                    log.debug("The groupId store path doesn't exist. Path: [{}]", (Object)path);
                                }
                                return;
                            }
                            log.error("Delete groupId failed. Path: [{}]", (Object)path, ex);
                            return;
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("Delete groupId success. Path: [{}]", (Object)path);
                        }
                    });
                });
            }
            RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet();
        }
    }

    @Override
    protected boolean hasAuthenticated() {
        return this.authenticator == null || this.authenticator.complete();
    }

    @Override
    protected void channelPrepare(ChannelHandlerContext ctx, ByteBuf requestBuf, BiConsumer<Long, Throwable> registerRequestParseLatency, BiConsumer<ApiKeys, Long> registerRequestLatency) throws AuthenticationException {
        if (this.authenticator != null) {
            this.authenticator.authenticate(ctx, requestBuf, registerRequestParseLatency, registerRequestLatency, this::validateTenantAccessForSession);
            if (this.authenticator.complete() && this.kafkaConfig.isKafkaEnableMultiTenantMetadata()) {
                this.setRequestStats(this.requestStats.forTenant(this.getCurrentTenant()));
            }
        }
    }

    @Override
    protected void maybeDelayCloseOnAuthenticationFailure() {
        if (this.failedAuthenticationDelayMs > 0) {
            this.ctx.executor().schedule(this::completeCloseOnAuthenticationFailure, (long)this.failedAuthenticationDelayMs, TimeUnit.MILLISECONDS);
        } else {
            this.completeCloseOnAuthenticationFailure();
        }
    }

    @Override
    protected void completeCloseOnAuthenticationFailure() {
        if (this.isActive.get() && this.authenticator != null) {
            this.authenticator.sendAuthenticationFailureResponse(__ -> this.close());
        }
    }

    @Override
    protected void handleApiVersionsRequest(KafkaCommandDecoder.KafkaHeaderAndRequest apiVersionRequest, CompletableFuture<AbstractResponse> resultFuture) {
        if (!ApiKeys.API_VERSIONS.isVersionSupported(apiVersionRequest.getHeader().apiVersion())) {
            ApiVersionsResponse apiResponse = this.overloadDefaultApiVersionsResponse(true);
            resultFuture.complete((AbstractResponse)apiResponse);
        } else {
            ApiVersionsResponse apiResponse = this.overloadDefaultApiVersionsResponse(false);
            resultFuture.complete((AbstractResponse)apiResponse);
        }
    }

    protected ApiVersionsResponse overloadDefaultApiVersionsResponse(boolean unsupportedApiVersion) {
        if (unsupportedApiVersion) {
            return KafkaResponseUtils.newApiVersions(Errors.UNSUPPORTED_VERSION);
        }
        ArrayList<ApiVersion> versionList = new ArrayList<ApiVersion>();
        block3: for (ApiKeys apiKey : ApiKeys.values()) {
            if (apiKey.minRequiredInterBrokerMagic > 2) continue;
            switch (apiKey) {
                case LIST_OFFSETS: {
                    versionList.add(new ApiVersion(2, 0, apiKey.latestVersion()));
                    continue block3;
                }
                default: {
                    versionList.add(new ApiVersion(apiKey));
                }
            }
        }
        return KafkaResponseUtils.newApiVersions(versionList);
    }

    @Override
    protected void handleError(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> resultFuture) {
        String err = String.format("Kafka API (%s) Not supported by kop server.", kafkaHeaderAndRequest.getHeader().apiKey());
        log.error(err);
        AbstractResponse apiResponse = kafkaHeaderAndRequest.getRequest().getErrorResponse((Throwable)new UnsupportedOperationException(err));
        resultFuture.complete(apiResponse);
    }

    @Override
    protected void handleInactive(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> resultFuture) {
        AbstractRequest request = kafkaHeaderAndRequest.getRequest();
        AbstractResponse apiResponse = request.getErrorResponse((Throwable)new LeaderNotAvailableException("Channel is closing!"));
        log.error("Kafka API {} is send to a closing channel", (Object)kafkaHeaderAndRequest.getHeader().apiKey());
        resultFuture.complete(apiResponse);
    }

    private CompletableFuture<Integer> getPartitionedTopicMetadataAsync(String topicName, boolean allowAutoTopicCreation) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        this.admin.topics().getPartitionedTopicMetadataAsync(topicName).whenComplete((metadata, e) -> {
            if (e == null) {
                if (metadata.partitions > 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("Topic {} has {} partitions", (Object)topicName, (Object)metadata.partitions);
                    }
                    future.complete(metadata.partitions);
                } else {
                    future.complete(0);
                }
            } else if (e instanceof PulsarAdminException.NotFoundException) {
                if (allowAutoTopicCreation) {
                    String namespace = TopicName.get((String)topicName).getNamespace();
                    this.admin.namespaces().getPoliciesAsync(namespace).whenComplete((policies, err) -> {
                        if (err != null || policies == null) {
                            log.error("[{}] Cannot get policies for namespace {}", new Object[]{this.ctx.channel(), namespace, err});
                            future.complete(-2);
                        } else {
                            boolean allowed = this.kafkaConfig.isAllowAutoTopicCreation();
                            if (policies.autoTopicCreationOverride != null) {
                                allowed = policies.autoTopicCreationOverride.isAllowAutoTopicCreation();
                            }
                            if (!allowed) {
                                log.error("[{}] Topic {} doesn't exist and it's not allowed to auto create partitioned topic", (Object)this.ctx.channel(), (Object)topicName);
                                future.complete(-2);
                            } else {
                                log.info("[{}] Topic {} doesn't exist, auto create it with {} partitions", new Object[]{this.ctx.channel(), topicName, this.defaultNumPartitions});
                                this.admin.topics().createPartitionedTopicAsync(topicName, this.defaultNumPartitions).whenComplete((__, createException) -> {
                                    if (createException == null) {
                                        future.complete(this.defaultNumPartitions);
                                    } else {
                                        log.warn("[{}] Failed to create partitioned topic {}: {}", new Object[]{this.ctx.channel(), topicName, createException.getMessage()});
                                        future.complete(-2);
                                    }
                                });
                            }
                        }
                    });
                } else {
                    log.error("[{}] Topic {} doesn't exist and it's not allowed to auto create partitioned topic", new Object[]{this.ctx.channel(), topicName, e});
                    future.complete(-2);
                }
            } else {
                log.error("[{}] Failed to get partitioned topic {}", new Object[]{this.ctx.channel(), topicName, e});
                future.complete(-2);
            }
        });
        return future;
    }

    private CompletableFuture<Set<String>> expandAllowedNamespaces(Set<String> allowedNamespaces) {
        String currentTenant = this.getCurrentTenant(this.kafkaConfig.getKafkaTenant());
        return KafkaRequestHandler.expandAllowedNamespaces(allowedNamespaces, currentTenant, this.pulsarService);
    }

    @VisibleForTesting
    static CompletableFuture<Set<String>> expandAllowedNamespaces(Set<String> allowedNamespaces, String currentTenant, PulsarService pulsarService) {
        CopyOnWriteArraySet<String> result = new CopyOnWriteArraySet<String>();
        ArrayList<CompletionStage> results = new ArrayList<CompletionStage>();
        for (String namespaceTemplate : allowedNamespaces) {
            String namespace = namespaceTemplate.replace("${tenant}", currentTenant);
            if (!namespace.endsWith("/*")) {
                result.add(namespace);
                results.add(CompletableFuture.completedFuture(namespace));
                continue;
            }
            int slash = namespace.lastIndexOf(47);
            String tenant = namespace.substring(0, slash);
            results.add(pulsarService.getPulsarResources().getNamespaceResources().listNamespacesAsync(tenant).thenAccept(namespaces -> namespaces.forEach(ns -> result.add(tenant + "/" + ns))));
        }
        return CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).thenApply(f -> result);
    }

    private List<TopicAndMetadata> analyzeFullTopicNames(Stream<String> fullTopicNames) {
        HashMap<String, List> topicToPartitionIndexes = new HashMap<String, List>();
        fullTopicNames.forEach(fullTopicName -> {
            TopicName topicName = TopicName.get((String)fullTopicName);
            if (topicName.getLocalName().startsWith("__change_events") && topicName.getPartitionedTopicName().endsWith("__change_events")) {
                return;
            }
            topicToPartitionIndexes.computeIfAbsent(topicName.getPartitionedTopicName(), ignored -> new ArrayList()).add(topicName.getPartitionIndex());
        });
        if (topicToPartitionIndexes.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<TopicAndMetadata> topicAndMetadataList = new ArrayList<TopicAndMetadata>();
        topicToPartitionIndexes.forEach((topic, partitionIndexes) -> {
            Collections.sort(partitionIndexes);
            int lastIndex = (Integer)partitionIndexes.get(partitionIndexes.size() - 1);
            if (lastIndex < 0) {
                topicAndMetadataList.add(new TopicAndMetadata((String)topic, 0));
            } else if (lastIndex == partitionIndexes.size() - 1) {
                topicAndMetadataList.add(new TopicAndMetadata((String)topic, partitionIndexes.size()));
            } else {
                log.warn("The partitions of topic {} is wrong ({}), try to create missed partitions", topic, (Object)partitionIndexes.size());
                this.admin.topics().createMissedPartitionsAsync(topic);
            }
        });
        return topicAndMetadataList;
    }

    private CompletableFuture<List<String>> authorizeNamespacesAsync(Collection<String> namespaces, AclOperation aclOperation) {
        Map<String, CompletableFuture> futureMap = namespaces.stream().collect(Collectors.toMap(namespace -> namespace, namespace -> this.authorize(aclOperation, Resource.of(ResourceType.NAMESPACE, namespace))));
        return CoreUtils.waitForAll(futureMap.values()).thenApply(__ -> futureMap.entrySet().stream().filter(e -> {
            if (!((Boolean)((CompletableFuture)e.getValue()).join()).booleanValue()) {
                log.warn("Failed to authorize {} for ACL operation {}", e.getKey(), (Object)aclOperation);
                return false;
            }
            return true;
        }).map(Map.Entry::getKey).collect(Collectors.toList()));
    }

    private CompletableFuture<Stream<String>> listAllTopicsFromNamespacesAsync(List<String> namespaces) {
        return CoreUtils.waitForAll(namespaces.stream().map(namespace -> this.pulsarService.getNamespaceService().getListOfPersistentTopics(NamespaceName.get((String)namespace))).collect(Collectors.toList()), topics -> topics.stream().flatMap(Collection::stream));
    }

    private CompletableFuture<ListPair<String>> authorizeTopicsAsync(Collection<String> topics, AclOperation aclOperation) {
        Map<String, CompletableFuture> futureMap = topics.stream().collect(Collectors.toMap(topic -> topic, topic -> this.authorize(aclOperation, Resource.of(ResourceType.TOPIC, topic))));
        return CoreUtils.waitForAll(futureMap.values()).thenApply(__ -> ListPair.of(futureMap.entrySet().stream().collect(Collectors.groupingBy(e -> (Boolean)((CompletableFuture)e.getValue()).join()))).map(Map.Entry::getKey));
    }

    private CompletableFuture<List<TopicAndMetadata>> findTopicMetadata(ListPair<String> listPair, boolean allowTopicAutoCreation) {
        Map<String, CompletableFuture> futureMap = CoreUtils.listToMap(listPair.getSuccessfulList(), topic -> this.getPartitionedTopicMetadataAsync((String)topic, allowTopicAutoCreation));
        return ((CompletableFuture)CoreUtils.waitForAll(futureMap.values()).thenApply(__ -> CoreUtils.mapToList(futureMap, (key, value) -> new TopicAndMetadata((String)key, (Integer)value.join())))).thenApply(authorizedTopicAndMetadataList -> ListUtils.union((List)authorizedTopicAndMetadataList, CoreUtils.listToList(listPair.getFailedList(), topic -> new TopicAndMetadata((String)topic, -1))));
    }

    private CompletableFuture<List<TopicAndMetadata>> getTopicsAsync(MetadataRequest request, Set<String> fullTopicNames) {
        if (request.topics() == null || request.topics().isEmpty() && request.version() == 0) {
            KopBrokerLookupManager.clear();
            return ((CompletableFuture)((CompletableFuture)this.expandAllowedNamespaces(this.kafkaConfig.getKopAllowedNamespaces()).thenCompose(namespaces -> this.authorizeNamespacesAsync((Collection<String>)namespaces, AclOperation.DESCRIBE))).thenCompose(this::listAllTopicsFromNamespacesAsync)).thenApply(this::analyzeFullTopicNames);
        }
        return this.authorizeTopicsAsync(fullTopicNames, AclOperation.DESCRIBE).thenCompose(authorizedTopicsPair -> this.findTopicMetadata((ListPair<String>)authorizedTopicsPair, request.allowAutoTopicCreation()));
    }

    @Override
    protected void handleTopicMetadataRequest(KafkaCommandDecoder.KafkaHeaderAndRequest metadataHar, CompletableFuture<AbstractResponse> resultFuture) {
        List<? extends Node> allNodes = Collections.synchronizedList(new ArrayList<Node>(this.adminManager.getBrokers(this.advertisedEndPoint.getListenerName())));
        int controllerId = this.adminManager.getControllerId(this.advertisedEndPoint.getListenerName());
        String namespacePrefix = this.currentNamespacePrefix();
        MetadataRequest request = (MetadataRequest)metadataHar.getRequest();
        Map fullTopicNameToOriginal = request.topics() == null ? Collections.emptyMap() : request.topics().stream().distinct().collect(Collectors.toMap(topic -> new KopTopic((String)topic, namespacePrefix).getFullName(), topic -> topic));
        Function<String, String> getOriginalTopic = fullTopicName -> fullTopicNameToOriginal.isEmpty() ? KopTopic.removeDefaultNamespacePrefix(fullTopicName, namespacePrefix) : fullTopicNameToOriginal.getOrDefault(fullTopicName, fullTopicName);
        String metadataNamespace = this.kafkaConfig.getKafkaMetadataNamespace();
        this.getTopicsAsync(request, fullTopicNameToOriginal.keySet()).whenComplete((topicAndMetadataList, e) -> {
            if (e != null) {
                log.error("[{}] Request {}: Exception fetching metadata", new Object[]{this.ctx.channel(), metadataHar.getHeader(), e});
                resultFuture.completeExceptionally((Throwable)e);
                return;
            }
            ListPair<TopicAndMetadata> listPair = ListPair.split(topicAndMetadataList.stream(), TopicAndMetadata::hasNoError);
            CoreUtils.waitForAll(listPair.getSuccessfulList().stream().map(topicAndMetadata -> topicAndMetadata.lookupAsync(this::lookup, getOriginalTopic, metadataNamespace)).collect(Collectors.toList()), successfulTopicMetadataList -> {
                List topicMetadataList = ListUtils.union((List)successfulTopicMetadataList, CoreUtils.listToList(listPair.getFailedList(), metadata -> metadata.toTopicMetadata(getOriginalTopic, metadataNamespace)));
                resultFuture.complete((AbstractResponse)KafkaResponseUtils.newMetadata(allNodes, this.clusterName, controllerId, topicMetadataList, request.version()));
                return null;
            }).exceptionally(lookupException -> {
                log.error("[{}] Unexpected exception during lookup", (Object)this.ctx.channel(), lookupException);
                resultFuture.completeExceptionally((Throwable)lookupException);
                return null;
            });
        });
    }

    private void disableCnxAutoRead() {
        if (this.ctx != null && this.ctx.channel().config().isAutoRead()) {
            this.ctx.channel().config().setAutoRead(false);
            if (log.isDebugEnabled()) {
                log.debug("[{}] disable auto read", (Object)this.ctx.channel());
            }
        }
    }

    private void enableCnxAutoRead() {
        if (this.ctx != null && !this.ctx.channel().config().isAutoRead() && !this.autoReadDisabledPublishBufferLimiting) {
            this.ctx.channel().config().setAutoRead(true);
            this.ctx.read();
            if (log.isDebugEnabled()) {
                log.debug("[{}] enable auto read", (Object)this.ctx.channel());
            }
        }
    }

    private void startSendOperationForThrottling(long msgSize) {
        long currentPendingBytes = this.pendingBytes.addAndGet(msgSize);
        if (currentPendingBytes >= this.maxPendingBytes && !this.autoReadDisabledPublishBufferLimiting && this.maxPendingBytes > 0L) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] disable auto read because currentPendingBytes({}) > maxPendingBytes({})", new Object[]{this.ctx.channel(), currentPendingBytes, this.maxPendingBytes});
            }
            this.disableCnxAutoRead();
            this.autoReadDisabledPublishBufferLimiting = true;
            this.pulsarService.getBrokerService().pausedConnections(1);
        }
    }

    private void completeSendOperationForThrottling(long msgSize) {
        long currentPendingBytes = this.pendingBytes.addAndGet(-msgSize);
        if (currentPendingBytes < this.resumeThresholdPendingBytes && this.autoReadDisabledPublishBufferLimiting) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] enable auto read because currentPendingBytes({}) < resumeThreshold({})", new Object[]{this.ctx.channel(), currentPendingBytes, this.resumeThresholdPendingBytes});
            }
            this.autoReadDisabledPublishBufferLimiting = false;
            this.enableCnxAutoRead();
            this.pulsarService.getBrokerService().resumedConnections(1);
        }
    }

    @Override
    protected void handleProduceRequest(KafkaCommandDecoder.KafkaHeaderAndRequest produceHar, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(produceHar.getRequest() instanceof ProduceRequest));
        ProduceRequest produceRequest = (ProduceRequest)produceHar.getRequest();
        int numPartitions = produceRequest.data().topicData().size();
        if (numPartitions == 0) {
            resultFuture.complete((AbstractResponse)new ProduceResponse(Collections.emptyMap()));
            return;
        }
        ConcurrentHashMap unauthorizedTopicResponsesMap = new ConcurrentHashMap();
        HashMap invalidRequestResponses = new HashMap();
        ConcurrentHashMap authorizedRequestInfo = new ConcurrentHashMap();
        int timeoutMs = produceRequest.timeout();
        String namespacePrefix = this.currentNamespacePrefix();
        AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(numPartitions);
        Runnable completeOne = () -> {
            if (unfinishedAuthorizationCount.decrementAndGet() == 0) {
                if (authorizedRequestInfo.isEmpty()) {
                    resultFuture.complete((AbstractResponse)new ProduceResponse(unauthorizedTopicResponsesMap));
                    return;
                }
                AppendRecordsContext appendRecordsContext = AppendRecordsContext.get(this.topicManager, this::startSendOperationForThrottling, this::completeSendOperationForThrottling, this.pendingTopicFuturesMap, this.ctx);
                ReplicaManager replicaManager = this.getReplicaManager();
                replicaManager.appendRecords(timeoutMs, false, namespacePrefix, authorizedRequestInfo, PartitionLog.AppendOrigin.Client, appendRecordsContext).whenComplete((response, ex) -> {
                    appendRecordsContext.recycle();
                    if (ex != null) {
                        resultFuture.completeExceptionally(ex.getCause());
                        return;
                    }
                    HashMap mergedResponse = new HashMap();
                    mergedResponse.putAll(response);
                    mergedResponse.putAll(unauthorizedTopicResponsesMap);
                    mergedResponse.putAll(invalidRequestResponses);
                    resultFuture.complete((AbstractResponse)new ProduceResponse(mergedResponse));
                    response.keySet().forEach(tp -> replicaManager.tryCompleteDelayedFetch(new DelayedOperationKey.TopicPartitionOperationKey((TopicPartition)tp)));
                });
            }
        };
        produceRequest.data().topicData().forEach(topicProduceData -> topicProduceData.partitionData().forEach(partitionProduceData -> {
            MemoryRecords records = (MemoryRecords)partitionProduceData.records();
            int index = partitionProduceData.index();
            String name = topicProduceData.name();
            TopicPartition topicPartition = new TopicPartition(name, index);
            try {
                this.validateRecords(produceHar.getRequest().version(), records);
            }
            catch (ApiException ex2) {
                invalidRequestResponses.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.forException((Throwable)ex2)));
                completeOne.run();
                return;
            }
            String fullPartitionName = KopTopic.toString(topicPartition, namespacePrefix);
            this.authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, fullPartitionName)).whenCompleteAsync((isAuthorized, ex) -> {
                if (ex != null) {
                    log.error("Write topic authorize failed, topic - {}. {}", (Object)fullPartitionName, (Object)ex.getMessage());
                    unauthorizedTopicResponsesMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED));
                    completeOne.run();
                    return;
                }
                if (!isAuthorized.booleanValue()) {
                    unauthorizedTopicResponsesMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED));
                    completeOne.run();
                    return;
                }
                authorizedRequestInfo.put(topicPartition, records);
                completeOne.run();
            }, (Executor)this.ctx.executor());
        }));
    }

    private void validateRecords(short version, MemoryRecords records) {
        if (version >= 3) {
            Iterator iterator = records.batches().iterator();
            if (!iterator.hasNext()) {
                throw new InvalidRecordException("Produce requests with version " + version + " must have at least one record batch");
            }
            MutableRecordBatch entry = (MutableRecordBatch)iterator.next();
            if (entry.magic() != 2) {
                throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to contain record batches with magic version 2");
            }
            if (iterator.hasNext()) {
                throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to contain exactly one record batch");
            }
        }
    }

    @Override
    protected void handleFindCoordinatorRequest(KafkaCommandDecoder.KafkaHeaderAndRequest findCoordinator, CompletableFuture<AbstractResponse> resultFuture) {
        CompletableFuture<Object> storeGroupIdFuture;
        String pulsarTopicName;
        int partition;
        Preconditions.checkArgument((boolean)(findCoordinator.getRequest() instanceof FindCoordinatorRequest));
        FindCoordinatorRequest request = (FindCoordinatorRequest)findCoordinator.getRequest();
        if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) {
            TransactionCoordinator transactionCoordinator = this.getTransactionCoordinator();
            partition = transactionCoordinator.partitionFor(request.data().key());
            pulsarTopicName = transactionCoordinator.getTopicPartitionName(partition);
            storeGroupIdFuture = CompletableFuture.completedFuture(null);
        } else if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.GROUP.id()) {
            partition = this.getGroupCoordinator().partitionFor(request.data().key());
            pulsarTopicName = this.getGroupCoordinator().getTopicPartitionName(partition);
            if (this.kafkaConfig.isKopEnableGroupLevelConsumerMetrics()) {
                String groupId = request.data().key();
                String groupIdPath = GroupIdUtils.groupIdPathFormat(findCoordinator.getClientHost(), findCoordinator.getHeader().clientId());
                this.currentConnectedClientId.add(findCoordinator.getHeader().clientId());
                storeGroupIdFuture = this.storeGroupId(groupId, groupIdPath);
            } else {
                storeGroupIdFuture = CompletableFuture.completedFuture(null);
            }
        } else {
            throw new NotImplementedException("FindCoordinatorRequest not support unknown type " + request.data().keyType());
        }
        storeGroupIdFuture.whenComplete((__, ex) -> {
            if (ex != null) {
                log.warn("Store groupId failed, the groupId might already stored.", ex);
            }
            this.findBroker(TopicName.get((String)pulsarTopicName)).whenComplete((result, throwable) -> {
                if (result.error != Errors.NONE || throwable != null) {
                    log.error("[{}] Request {}: Error while find coordinator.", new Object[]{this.ctx.channel(), findCoordinator.getHeader(), throwable});
                    resultFuture.complete((AbstractResponse)KafkaResponseUtils.newFindCoordinator(Errors.LEADER_NOT_AVAILABLE));
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Found node {} as coordinator for key {} partition {}.", new Object[]{this.ctx.channel(), result.node, request.data().key(), partition});
                }
                resultFuture.complete((AbstractResponse)KafkaResponseUtils.newFindCoordinator(result.node));
            });
        });
    }

    @VisibleForTesting
    protected CompletableFuture<Void> storeGroupId(String groupId, String groupIdPath) {
        String path = this.groupIdStoredPath + groupIdPath;
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ((CompletableFuture)this.metadataStore.put(path, groupId.getBytes(StandardCharsets.UTF_8), Optional.empty()).thenAccept(__ -> future.complete(null))).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    @VisibleForTesting
    public <T> void replaceTopicPartition(Map<TopicPartition, T> replacedMap, Map<TopicPartition, TopicPartition> replacingIndex) {
        String namespacePrefix = this.currentNamespacePrefix();
        HashMap newMap = new HashMap();
        replacedMap.entrySet().removeIf(entry -> {
            if (replacingIndex.containsKey(entry.getKey())) {
                newMap.put((TopicPartition)replacingIndex.get(entry.getKey()), entry.getValue());
                return true;
            }
            if (KopTopic.isFullTopicName(((TopicPartition)entry.getKey()).topic())) {
                newMap.put(new TopicPartition(KopTopic.removeDefaultNamespacePrefix(((TopicPartition)entry.getKey()).topic(), namespacePrefix), ((TopicPartition)entry.getKey()).partition()), entry.getValue());
                return true;
            }
            return false;
        });
        replacedMap.putAll(newMap);
    }

    @Override
    protected void handleOffsetFetchRequest(KafkaCommandDecoder.KafkaHeaderAndRequest offsetFetch, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(offsetFetch.getRequest() instanceof OffsetFetchRequest));
        OffsetFetchRequest request = (OffsetFetchRequest)offsetFetch.getRequest();
        Preconditions.checkState((this.getGroupCoordinator() != null ? 1 : 0) != 0, (Object)"Group Coordinator not started");
        CompletableFuture authorizeFuture = new CompletableFuture();
        HashMap replacingIndex = new HashMap();
        ArrayList authorizedPartitions = new ArrayList();
        ConcurrentMap unauthorizedPartitionData = Maps.newConcurrentMap();
        ConcurrentMap unknownPartitionData = Maps.newConcurrentMap();
        if (request.partitions() == null || request.partitions().isEmpty()) {
            authorizeFuture.complete(null);
        } else {
            AtomicInteger partitionCount = new AtomicInteger(request.partitions().size());
            Runnable completeOneAuthorization = () -> {
                if (partitionCount.decrementAndGet() == 0) {
                    authorizeFuture.complete(authorizedPartitions);
                }
            };
            String namespacePrefix = this.currentNamespacePrefix();
            request.partitions().forEach(tp -> {
                try {
                    String fullName = new KopTopic(tp.topic(), namespacePrefix).getFullName();
                    this.authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullName)).whenComplete((isAuthorized, ex) -> {
                        if (ex != null) {
                            log.error("Describe topic authorize failed, topic - {}. {}", (Object)fullName, (Object)ex.getMessage());
                            unauthorizedPartitionData.put(tp, OffsetFetchResponse.UNAUTHORIZED_PARTITION);
                            completeOneAuthorization.run();
                            return;
                        }
                        if (!isAuthorized.booleanValue()) {
                            unauthorizedPartitionData.put(tp, OffsetFetchResponse.UNAUTHORIZED_PARTITION);
                            completeOneAuthorization.run();
                            return;
                        }
                        TopicPartition newTopicPartition = new TopicPartition(fullName, tp.partition());
                        replacingIndex.put(newTopicPartition, tp);
                        authorizedPartitions.add(newTopicPartition);
                        completeOneAuthorization.run();
                    });
                }
                catch (KoPTopicException e) {
                    log.warn("Invalid topic name: {}", (Object)tp.topic(), (Object)e);
                    unknownPartitionData.put(tp, OffsetFetchResponse.UNKNOWN_PARTITION);
                }
            });
        }
        authorizeFuture.whenComplete((partitionList, ex) -> {
            KeyValue<Errors, Map<TopicPartition, OffsetFetchResponse.PartitionData>> keyValue = this.getGroupCoordinator().handleFetchOffsets(request.groupId(), Optional.ofNullable(partitionList));
            if (log.isDebugEnabled()) {
                log.debug("OFFSET_FETCH Unknown partitions: {}, Unauthorized partitions: {}.", (Object)unknownPartitionData, (Object)unauthorizedPartitionData);
            }
            if (log.isTraceEnabled()) {
                StringBuffer traceInfo = new StringBuffer();
                replacingIndex.forEach((inner, outer) -> traceInfo.append(String.format("\tinnerName:%s, outerName:%s%n", inner, outer)));
                log.trace("OFFSET_FETCH TopicPartition relations: \n{}", (Object)traceInfo);
            }
            this.replaceTopicPartition((Map)keyValue.getValue(), replacingIndex);
            ((Map)keyValue.getValue()).putAll(unauthorizedPartitionData);
            ((Map)keyValue.getValue()).putAll(unknownPartitionData);
            resultFuture.complete((AbstractResponse)new OffsetFetchResponse((Errors)keyValue.getKey(), (Map)keyValue.getValue()));
        });
    }

    private CompletableFuture<Pair<Errors, Long>> fetchOffset(String topicName, long timestamp) {
        CompletableFuture<Pair<Errors, Long>> partitionData = new CompletableFuture<Pair<Errors, Long>>();
        ((CompletableFuture)this.topicManager.getTopic(topicName).thenAccept(perTopicOpt -> {
            if (!perTopicOpt.isPresent()) {
                partitionData.complete(Pair.of((Object)Errors.UNKNOWN_TOPIC_OR_PARTITION, null));
                return;
            }
            PersistentTopic perTopic = (PersistentTopic)perTopicOpt.get();
            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)perTopic.getManagedLedger();
            PositionImpl lac = (PositionImpl)managedLedger.getLastConfirmedEntry();
            if (lac == null) {
                log.error("[{}] Unexpected LastConfirmedEntry for topic {}, managed ledger: {}", new Object[]{this.ctx, perTopic.getName(), managedLedger.getName()});
                partitionData.complete(Pair.of((Object)Errors.UNKNOWN_SERVER_ERROR, (Object)-1L));
                return;
            }
            if (timestamp == -1L) {
                PositionImpl position = (PositionImpl)managedLedger.getLastConfirmedEntry();
                if (log.isDebugEnabled()) {
                    log.debug("Get latest position for topic {} time {}. result: {}", new Object[]{perTopic.getName(), timestamp, position});
                }
                long offset2 = MessageMetadataUtils.getLogEndOffset((ManagedLedger)managedLedger);
                partitionData.complete(Pair.of((Object)Errors.NONE, (Object)offset2));
            } else if (timestamp == -2L) {
                PositionImpl position = OffsetFinder.getFirstValidPosition(managedLedger);
                if (position == null) {
                    log.error("[{}] Failed to find first valid position for topic {}", (Object)this.ctx, (Object)perTopic.getName());
                    partitionData.complete(Pair.of((Object)Errors.UNKNOWN_SERVER_ERROR, (Object)-1L));
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Get earliest position for topic {}: {}, lac: {}", new Object[]{this.ctx, perTopic.getName(), position, lac});
                }
                if (position.compareTo(lac) > 0) {
                    partitionData.complete(Pair.of((Object)Errors.NONE, (Object)0L));
                } else {
                    MessageMetadataUtils.getOffsetOfPosition(managedLedger, position, false, timestamp, this.skipMessagesWithoutIndex).whenComplete((offset, throwable) -> {
                        if (throwable != null) {
                            log.error("[{}] Failed to get offset for position {}", new Object[]{perTopic, position, throwable});
                            partitionData.complete(Pair.of((Object)Errors.UNKNOWN_SERVER_ERROR, null));
                            return;
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Get offset of position for topic {}: {}, lac: {}, offset: {}", new Object[]{this.ctx, perTopic.getName(), position, lac, offset});
                        }
                        partitionData.complete(Pair.of((Object)Errors.NONE, (Object)offset));
                    });
                }
            } else {
                this.fetchOffsetByTimestamp(partitionData, managedLedger, lac, timestamp, perTopic.getName());
            }
        })).exceptionally(e -> {
            Throwable throwable = FutureUtil.unwrapCompletionException((Throwable)e);
            log.error("Failed while get persistentTopic topic: {} ts: {}. ", new Object[]{topicName, timestamp, throwable});
            partitionData.complete(Pair.of((Object)Errors.forException((Throwable)throwable), null));
            return null;
        });
        return partitionData;
    }

    private void fetchOffsetByTimestamp(final CompletableFuture<Pair<Errors, Long>> partitionData, final ManagedLedgerImpl managedLedger, final PositionImpl lac, final long timestamp, final String topic) {
        OffsetFinder offsetFinder = new OffsetFinder(managedLedger);
        offsetFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback(){

            public void findEntryComplete(Position position, Object ctx) {
                PositionImpl finalPosition;
                if (position == null) {
                    finalPosition = OffsetFinder.getFirstValidPosition(managedLedger);
                    if (finalPosition == null) {
                        log.warn("Unable to find position for topic {} time {}. get NULL position", (Object)topic, (Object)timestamp);
                        partitionData.complete(Pair.of((Object)Errors.UNKNOWN_SERVER_ERROR, null));
                        return;
                    }
                } else {
                    finalPosition = (PositionImpl)position;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Find position for topic {} time {}. position: {}", new Object[]{topic, timestamp, finalPosition});
                }
                if (finalPosition.compareTo(lac) > 0 || MessageMetadataUtils.getCurrentOffset((ManagedLedger)managedLedger) < 0L) {
                    long offset2 = Math.max(0L, MessageMetadataUtils.getCurrentOffset((ManagedLedger)managedLedger));
                    partitionData.complete(Pair.of((Object)Errors.NONE, (Object)offset2));
                } else {
                    MessageMetadataUtils.getOffsetOfPosition(managedLedger, finalPosition, true, timestamp, KafkaRequestHandler.this.skipMessagesWithoutIndex).whenComplete((offset, throwable) -> {
                        if (throwable != null) {
                            log.error("[{}] Failed to get offset for position {}", new Object[]{topic, finalPosition, throwable});
                            partitionData.complete(Pair.of((Object)Errors.UNKNOWN_SERVER_ERROR, null));
                            return;
                        }
                        partitionData.complete(Pair.of((Object)Errors.NONE, (Object)offset));
                    });
                }
            }

            public void findEntryFailed(ManagedLedgerException exception, Optional<Position> position, Object ctx) {
                log.warn("Unable to find position for topic {} time {}. Exception:", new Object[]{topic, timestamp, exception});
                partitionData.complete(Pair.of((Object)Errors.UNKNOWN_SERVER_ERROR, null));
            }
        });
    }

    private void waitResponseDataComplete(CompletableFuture<AbstractResponse> resultFuture, Map<TopicPartition, CompletableFuture<Pair<Errors, Long>>> responseData, boolean legacy) {
        CompletableFuture.allOf(responseData.values().toArray(new CompletableFuture[0])).whenComplete((ignore, ex) -> {
            ListOffsetsResponse response = KafkaResponseUtils.newListOffset(CoreUtils.mapValue(responseData, CompletableFuture::join), legacy);
            resultFuture.complete((AbstractResponse)response);
        });
    }

    private void handleListOffsetRequestV1AndAbove(KafkaCommandDecoder.KafkaHeaderAndRequest listOffset, CompletableFuture<AbstractResponse> resultFuture) {
        ListOffsetsRequest request = (ListOffsetsRequest)listOffset.getRequest();
        ConcurrentMap responseData = Maps.newConcurrentMap();
        ListOffsetsRequestData data = request.data();
        if (data.topics().size() == 0) {
            resultFuture.complete((AbstractResponse)new ListOffsetsResponse(new ListOffsetsResponseData()));
            return;
        }
        AtomicInteger partitions = new AtomicInteger(data.topics().stream().map(ListOffsetsRequestData.ListOffsetsTopic::partitions).mapToInt(Collection::size).sum());
        Runnable completeOne = () -> {
            if (partitions.decrementAndGet() == 0) {
                this.waitResponseDataComplete(resultFuture, responseData, false);
            }
        };
        String namespacePrefix = this.currentNamespacePrefix();
        KafkaRequestUtils.forEachListOffsetRequest(request, (topic, times) -> {
            String fullPartitionName = KopTopic.toString(topic, namespacePrefix);
            this.authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullPartitionName)).whenComplete((isAuthorized, ex) -> {
                if (ex != null) {
                    log.error("Describe topic authorize failed, topic - {}. {}", (Object)fullPartitionName, (Object)ex.getMessage());
                    responseData.put(topic, CompletableFuture.completedFuture(Pair.of((Object)Errors.TOPIC_AUTHORIZATION_FAILED, null)));
                    completeOne.run();
                    return;
                }
                if (!isAuthorized.booleanValue()) {
                    responseData.put(topic, CompletableFuture.completedFuture(Pair.of((Object)Errors.TOPIC_AUTHORIZATION_FAILED, null)));
                    completeOne.run();
                    return;
                }
                responseData.put(topic, this.fetchOffset(fullPartitionName, times.timestamp()));
                completeOne.run();
            });
        });
    }

    private void handleListOffsetRequestV0(KafkaCommandDecoder.KafkaHeaderAndRequest listOffset, CompletableFuture<AbstractResponse> resultFuture) {
        ListOffsetRequestV0 request = this.byteBufToListOffsetRequestV0(listOffset.getBuffer());
        ConcurrentMap responseData = Maps.newConcurrentMap();
        if (request.offsetData().size() == 0) {
            resultFuture.complete((AbstractResponse)new ListOffsetsResponse(new ListOffsetsResponseData()));
            return;
        }
        AtomicInteger partitions = new AtomicInteger(request.offsetData().size());
        Runnable completeOne = () -> {
            if (partitions.decrementAndGet() == 0) {
                this.waitResponseDataComplete(resultFuture, responseData, true);
            }
        };
        if (log.isDebugEnabled()) {
            log.debug("received a v0 listOffset: {}", (Object)request.toString(true));
        }
        String namespacePrefix = this.currentNamespacePrefix();
        KafkaRequestUtils.LegacyUtils.forEachListOffsetRequest(request, topic -> times -> maxNumOffsets -> {
            String fullPartitionName = KopTopic.toString(topic, namespacePrefix);
            this.authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullPartitionName)).whenComplete((isAuthorized, ex) -> {
                CompletableFuture<Object> partitionData;
                if (ex != null) {
                    log.error("Describe topic authorize failed, topic - {}. {}", (Object)fullPartitionName, (Object)ex.getMessage());
                    responseData.put(topic, CompletableFuture.completedFuture(Pair.of((Object)Errors.TOPIC_AUTHORIZATION_FAILED, null)));
                    completeOne.run();
                    return;
                }
                if (!isAuthorized.booleanValue()) {
                    responseData.put(topic, CompletableFuture.completedFuture(Pair.of((Object)Errors.TOPIC_AUTHORIZATION_FAILED, null)));
                    completeOne.run();
                    return;
                }
                if (maxNumOffsets > 1) {
                    log.warn("request is asking for multiples offsets for {}, not supported for now", (Object)fullPartitionName);
                    partitionData = new CompletableFuture();
                    partitionData.complete((Pair<Errors, Long>)Pair.of((Object)Errors.UNKNOWN_SERVER_ERROR, null));
                }
                partitionData = this.fetchOffset(fullPartitionName, (long)times);
                responseData.put(topic, partitionData);
                completeOne.run();
            });
        });
    }

    @Override
    protected void handleListOffsetRequest(KafkaCommandDecoder.KafkaHeaderAndRequest listOffset, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(listOffset.getRequest() instanceof ListOffsetsRequest));
        if (listOffset.getHeader().apiVersion() == 0) {
            this.handleListOffsetRequestV0(listOffset, resultFuture);
        } else {
            this.handleListOffsetRequestV1AndAbove(listOffset, resultFuture);
        }
    }

    private Map<TopicPartition, Errors> nonExistingTopicErrors(OffsetCommitRequest request) {
        return Maps.newHashMap();
    }

    private Map<TopicPartition, Errors> nonExistingTopicErrors() {
        return Maps.newHashMap();
    }

    @VisibleForTesting
    Map<TopicPartition, OffsetAndMetadata> convertOffsetCommitRequestRetentionMs(Map<TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition> convertedOffsetData, long retentionTime, short apiVersion, long currentTimeStamp, long configOffsetsRetentionMs) {
        long offsetRetention = apiVersion <= 1 || apiVersion >= 5 || retentionTime == -1L ? configOffsetsRetentionMs : retentionTime;
        long defaultExpireTimestamp = offsetRetention + currentTimeStamp;
        long finalOffsetRetention = offsetRetention;
        return CoreUtils.mapValue(convertedOffsetData, partitionData -> {
            String metadata = partitionData.committedMetadata() == null ? "" : partitionData.committedMetadata();
            long expireTimeStamp = partitionData.commitTimestamp() == -1L ? defaultExpireTimestamp : finalOffsetRetention + partitionData.commitTimestamp();
            return OffsetAndMetadata.apply(partitionData.committedOffset(), metadata, currentTimeStamp, expireTimeStamp);
        });
    }

    @Override
    protected void handleOffsetCommitRequest(KafkaCommandDecoder.KafkaHeaderAndRequest offsetCommit, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(offsetCommit.getRequest() instanceof OffsetCommitRequest));
        Preconditions.checkState((this.getGroupCoordinator() != null ? 1 : 0) != 0, (Object)"Group Coordinator not started");
        OffsetCommitRequest request = (OffsetCommitRequest)offsetCommit.getRequest();
        OffsetCommitRequestData data = request.data();
        Map<TopicPartition, Errors> nonExistingTopicErrors = this.nonExistingTopicErrors(request);
        ConcurrentMap unauthorizedTopicErrors = Maps.newConcurrentMap();
        if (data.topics().isEmpty()) {
            resultFuture.complete((AbstractResponse)KafkaResponseUtils.newOffsetCommit(Maps.newHashMap()));
            return;
        }
        ConcurrentMap convertedOffsetData = Maps.newConcurrentMap();
        ConcurrentMap replacingIndex = Maps.newConcurrentMap();
        AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(data.topics().stream().map(OffsetCommitRequestData.OffsetCommitRequestTopic::partitions).mapToInt(Collection::size).sum());
        Consumer<Runnable> completeOne = action -> {
            action.run();
            if (unfinishedAuthorizationCount.decrementAndGet() == 0) {
                if (log.isTraceEnabled()) {
                    StringBuffer traceInfo = new StringBuffer();
                    replacingIndex.forEach((inner, outer) -> traceInfo.append(String.format("\tinnerName:%s, outerName:%s%n", inner, outer)));
                    log.trace("OFFSET_COMMIT TopicPartition relations: \n{}", (Object)traceInfo);
                }
                if (convertedOffsetData.isEmpty()) {
                    HashMap offsetCommitResult2 = Maps.newHashMap();
                    offsetCommitResult2.putAll(nonExistingTopicErrors);
                    offsetCommitResult2.putAll(unauthorizedTopicErrors);
                    OffsetCommitResponse response = KafkaResponseUtils.newOffsetCommit(offsetCommitResult2);
                    resultFuture.complete((AbstractResponse)response);
                } else {
                    Map<TopicPartition, OffsetAndMetadata> convertedPartitionData = this.convertOffsetCommitRequestRetentionMs(convertedOffsetData, KafkaRequestUtils.LegacyUtils.getRetentionTime(request), offsetCommit.getHeader().apiVersion(), Time.SYSTEM.milliseconds(), this.getGroupCoordinator().offsetConfig().offsetsRetentionMs());
                    this.getGroupCoordinator().handleCommitOffsets(data.groupId(), data.memberId(), data.generationId(), convertedPartitionData).thenAccept(offsetCommitResult -> {
                        this.replaceTopicPartition((Map)offsetCommitResult, replacingIndex);
                        offsetCommitResult.putAll(nonExistingTopicErrors);
                        offsetCommitResult.putAll(unauthorizedTopicErrors);
                        OffsetCommitResponse response = KafkaResponseUtils.newOffsetCommit(offsetCommitResult);
                        resultFuture.complete((AbstractResponse)response);
                    });
                }
            }
        };
        String namespacePrefix = this.currentNamespacePrefix();
        data.topics().forEach(topicData -> topicData.partitions().forEach(partitionData -> {
            KopTopic kopTopic;
            TopicPartition tp = new TopicPartition(topicData.name(), partitionData.partitionIndex());
            try {
                kopTopic = new KopTopic(tp.topic(), namespacePrefix);
            }
            catch (KoPTopicException e) {
                log.warn("Invalid topic name: {}", (Object)tp.topic(), (Object)e);
                completeOne.accept(() -> nonExistingTopicErrors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION));
                return;
            }
            String fullTopicName = kopTopic.getFullName();
            this.authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)).whenComplete((isAuthorized, ex) -> {
                if (ex != null) {
                    log.error("OffsetCommit authorize failed, topic - {}. {}", (Object)fullTopicName, (Object)ex.getMessage());
                    completeOne.accept(() -> unauthorizedTopicErrors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED));
                    return;
                }
                if (!isAuthorized.booleanValue()) {
                    completeOne.accept(() -> unauthorizedTopicErrors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED));
                    return;
                }
                completeOne.accept(() -> {
                    TopicPartition newTopicPartition = new TopicPartition(new KopTopic(tp.topic(), namespacePrefix).getFullName(), tp.partition());
                    convertedOffsetData.put(newTopicPartition, partitionData);
                    replacingIndex.put(newTopicPartition, tp);
                });
            });
        }));
    }

    @Override
    protected void handleFetchRequest(KafkaCommandDecoder.KafkaHeaderAndRequest fetch, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(fetch.getRequest() instanceof FetchRequest));
        FetchRequest request = (FetchRequest)fetch.getRequest();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Request {} Fetch request. Size: {}. Each item: ", new Object[]{this.ctx.channel(), fetch.getHeader(), request.fetchData().size()});
            request.fetchData().forEach((topic, data) -> log.debug("Fetch request topic:{} data:{}.", topic, (Object)data.toString()));
        }
        if (request.fetchData().isEmpty()) {
            resultFuture.complete((AbstractResponse)new FetchResponse(Errors.NONE, new LinkedHashMap(), 10, request.metadata().sessionId()));
            return;
        }
        ConcurrentHashMap erroneous = new ConcurrentHashMap();
        ConcurrentHashMap interesting = new ConcurrentHashMap();
        AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(request.fetchData().size());
        Runnable completeOne = () -> {
            if (unfinishedAuthorizationCount.decrementAndGet() == 0) {
                TransactionCoordinator transactionCoordinator = null;
                if (request.isolationLevel().equals((Object)IsolationLevel.READ_COMMITTED) && this.kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
                    transactionCoordinator = this.getTransactionCoordinator();
                }
                String namespacePrefix = this.currentNamespacePrefix();
                int fetchMaxBytes = request.maxBytes();
                int fetchMinBytes = Math.min(request.minBytes(), fetchMaxBytes);
                if (interesting.isEmpty()) {
                    if (log.isDebugEnabled()) {
                        log.debug("Fetch interesting is empty. Partitions: [{}]", (Object)request.fetchData());
                    }
                    resultFuture.complete((AbstractResponse)new FetchResponse(Errors.NONE, new LinkedHashMap(erroneous), 10, request.metadata().sessionId()));
                } else {
                    MessageFetchContext context = MessageFetchContext.get(this, transactionCoordinator, this.maxReadEntriesNum, namespacePrefix, this.getKafkaTopicManagerSharedState(), this.executor, fetch);
                    this.getReplicaManager().fetchMessage(request.maxWait(), fetchMinBytes, fetchMaxBytes, interesting, request.isolationLevel(), context).thenAccept(resultMap -> {
                        LinkedHashMap partitions = new LinkedHashMap();
                        resultMap.forEach((tp, data) -> partitions.put(tp, data.toPartitionData()));
                        partitions.putAll(erroneous);
                        boolean triggeredCompletion = resultFuture.complete(new ResponseCallbackWrapper((AbstractResponse)new FetchResponse(Errors.NONE, partitions, 0, request.metadata().sessionId()), () -> resultMap.forEach((__, readRecordsResult) -> readRecordsResult.recycle())));
                        if (!triggeredCompletion) {
                            resultMap.forEach((__, readRecordsResult) -> readRecordsResult.recycle());
                        }
                        context.recycle();
                    });
                }
            }
        };
        request.fetchData().forEach((topicPartition, partitionData) -> {
            String fullTopicName = KopTopic.toString(topicPartition, this.currentNamespacePrefix());
            this.authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)).whenComplete((isAuthorized, ex) -> {
                if (ex != null) {
                    log.error("Read topic authorize failed, topic - {}. {}", (Object)fullTopicName, (Object)ex.getMessage());
                    erroneous.put(topicPartition, KafkaRequestHandler.errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED));
                    completeOne.run();
                    return;
                }
                if (!isAuthorized.booleanValue()) {
                    erroneous.put(topicPartition, KafkaRequestHandler.errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED));
                    completeOne.run();
                    return;
                }
                interesting.put(topicPartition, partitionData);
                completeOne.run();
            });
        });
    }

    private static FetchResponse.PartitionData<Records> errorResponse(Errors error) {
        return new FetchResponse.PartitionData(error, -1L, -1L, -1L, null, (BaseRecords)MemoryRecords.EMPTY);
    }

    @Override
    protected void handleJoinGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest joinGroup, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(joinGroup.getRequest() instanceof JoinGroupRequest));
        Preconditions.checkState((this.getGroupCoordinator() != null ? 1 : 0) != 0, (Object)"Group Coordinator not started");
        JoinGroupRequest request = (JoinGroupRequest)joinGroup.getRequest();
        JoinGroupRequestData data = request.data();
        HashMap<String, byte[]> protocols = new HashMap<String, byte[]>();
        data.protocols().forEach(protocol -> protocols.put(protocol.name(), protocol.metadata()));
        this.getGroupCoordinator().handleJoinGroup(data.groupId(), data.memberId(), joinGroup.getHeader().clientId(), joinGroup.getClientHost(), data.rebalanceTimeoutMs(), data.sessionTimeoutMs(), data.protocolType(), protocols).thenAccept(joinGroupResult -> {
            HashMap<String, byte[]> members = new HashMap<String, byte[]>(joinGroupResult.getMembers());
            JoinGroupResponse response = KafkaResponseUtils.newJoinGroup(joinGroupResult.getError(), joinGroupResult.getGenerationId(), joinGroupResult.getProtocolName(), joinGroupResult.getProtocolType(), joinGroupResult.getMemberId(), joinGroupResult.getLeaderId(), members);
            if (log.isTraceEnabled()) {
                log.trace("Sending join group response {} for correlation id {} to client {}.", new Object[]{response, joinGroup.getHeader().correlationId(), joinGroup.getHeader().clientId()});
            }
            resultFuture.complete((AbstractResponse)response);
        });
    }

    @Override
    protected void handleSyncGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest syncGroup, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(syncGroup.getRequest() instanceof SyncGroupRequest));
        SyncGroupRequest request = (SyncGroupRequest)syncGroup.getRequest();
        SyncGroupRequestData data = request.data();
        this.groupIds.add(data.groupId());
        Map<String, byte[]> assignments = data.assignments().stream().collect(Collectors.toMap(SyncGroupRequestData.SyncGroupRequestAssignment::memberId, SyncGroupRequestData.SyncGroupRequestAssignment::assignment));
        this.getGroupCoordinator().handleSyncGroup(data.groupId(), data.generationId(), data.memberId(), assignments).thenAccept(syncGroupResult -> {
            SyncGroupResponse response = KafkaResponseUtils.newSyncGroup((Errors)syncGroupResult.getKey(), data.protocolType(), data.protocolName(), (byte[])syncGroupResult.getValue());
            resultFuture.complete((AbstractResponse)response);
        });
    }

    @Override
    protected void handleHeartbeatRequest(KafkaCommandDecoder.KafkaHeaderAndRequest heartbeat, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(heartbeat.getRequest() instanceof HeartbeatRequest));
        HeartbeatRequest request = (HeartbeatRequest)heartbeat.getRequest();
        this.getGroupCoordinator().handleHeartbeat(request.data().groupId(), request.data().memberId(), request.data().generationId()).thenAccept(errors -> {
            HeartbeatResponse response = KafkaResponseUtils.newHeartbeat(errors);
            if (log.isTraceEnabled()) {
                log.trace("Sending heartbeat response {} for correlation id {} to client {}.", new Object[]{response, heartbeat.getHeader().correlationId(), heartbeat.getHeader().clientId()});
            }
            resultFuture.complete((AbstractResponse)response);
        });
    }

    @Override
    protected void handleLeaveGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest leaveGroup, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(leaveGroup.getRequest() instanceof LeaveGroupRequest));
        LeaveGroupRequest request = (LeaveGroupRequest)leaveGroup.getRequest();
        LeaveGroupRequestData data = request.data();
        Set<String> members = data.members().stream().map(LeaveGroupRequestData.MemberIdentity::memberId).collect(Collectors.toSet());
        if (!data.memberId().isEmpty()) {
            members.add(data.memberId());
        }
        this.getGroupCoordinator().handleLeaveGroup(data.groupId(), members).thenAccept(errors -> resultFuture.complete((AbstractResponse)KafkaResponseUtils.newLeaveGroup(errors)));
    }

    @Override
    protected void handleDescribeGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest describeGroup, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(describeGroup.getRequest() instanceof DescribeGroupsRequest));
        DescribeGroupsRequest request = (DescribeGroupsRequest)describeGroup.getRequest();
        resultFuture.complete((AbstractResponse)KafkaResponseUtils.newDescribeGroups(request.data().groups().stream().map(groupId -> Pair.of((Object)groupId, this.getGroupCoordinator().handleDescribeGroup((String)groupId))).collect(Collectors.toMap(Pair::getLeft, Pair::getRight))));
    }

    @Override
    protected void handleListGroupsRequest(KafkaCommandDecoder.KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(listGroups.getRequest() instanceof ListGroupsRequest));
        KeyValue<Errors, List<GroupMetadata.GroupOverview>> listResult = this.getGroupCoordinator().handleListGroups();
        resultFuture.complete((AbstractResponse)KafkaResponseUtils.newListGroups((Errors)listResult.getKey(), (List)listResult.getValue()));
    }

    @Override
    protected void handleDeleteGroupsRequest(KafkaCommandDecoder.KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(deleteGroups.getRequest() instanceof DeleteGroupsRequest));
        DeleteGroupsRequest request = (DeleteGroupsRequest)deleteGroups.getRequest();
        DeleteGroupsRequestData data = request.data();
        resultFuture.complete((AbstractResponse)KafkaResponseUtils.newDeleteGroups(this.getGroupCoordinator().handleDeleteGroups(data.groupsNames())));
    }

    @Override
    protected void handleSaslAuthenticate(KafkaCommandDecoder.KafkaHeaderAndRequest saslAuthenticate, CompletableFuture<AbstractResponse> resultFuture) {
        resultFuture.complete((AbstractResponse)new SaslAuthenticateResponse(new SaslAuthenticateResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code()).setErrorMessage("SaslAuthenticate request received after successful authentication")));
    }

    @Override
    protected void handleSaslHandshake(KafkaCommandDecoder.KafkaHeaderAndRequest saslHandshake, CompletableFuture<AbstractResponse> resultFuture) {
        resultFuture.complete((AbstractResponse)KafkaResponseUtils.newSaslHandshake(Errors.ILLEGAL_SASL_STATE));
    }

    @Override
    protected void handleCreateTopics(KafkaCommandDecoder.KafkaHeaderAndRequest createTopics, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(createTopics.getRequest() instanceof CreateTopicsRequest));
        CreateTopicsRequest request = (CreateTopicsRequest)createTopics.getRequest();
        ConcurrentMap result = Maps.newConcurrentMap();
        HashMap validTopics = Maps.newHashMap();
        HashSet duplicateTopics = new HashSet();
        request.data().topics().forEach(topic -> {
            if (duplicateTopics.add(topic.name())) {
                validTopics.put(topic.name(), topic);
            } else {
                String errorMessage = "Create topics request from client `" + createTopics.getHeader().clientId() + "` contains multiple entries for the following topics: " + duplicateTopics;
                result.put(topic.name(), new ApiError(Errors.INVALID_REQUEST, errorMessage));
            }
        });
        if (validTopics.isEmpty()) {
            resultFuture.complete((AbstractResponse)KafkaResponseUtils.newCreateTopics(result));
            return;
        }
        String namespacePrefix = this.currentNamespacePrefix();
        AtomicInteger validTopicsCount = new AtomicInteger(validTopics.size());
        ConcurrentMap authorizedTopics = Maps.newConcurrentMap();
        Runnable createTopicsAsync = () -> {
            if (authorizedTopics.isEmpty()) {
                resultFuture.complete((AbstractResponse)KafkaResponseUtils.newCreateTopics(result));
                return;
            }
            this.adminManager.createTopicsAsync(authorizedTopics, request.data().timeoutMs(), namespacePrefix).thenApply(validResult -> {
                result.putAll(validResult);
                resultFuture.complete((AbstractResponse)KafkaResponseUtils.newCreateTopics(result));
                return null;
            });
        };
        BiConsumer<String, CreateTopicsRequestData.CreatableTopic> completeOneTopic = (topic, topicDetails) -> {
            authorizedTopics.put(topic, topicDetails);
            if (validTopicsCount.decrementAndGet() == 0) {
                createTopicsAsync.run();
            }
        };
        BiConsumer<String, ApiError> completeOneErrorTopic = (topic, error) -> {
            result.put(topic, error);
            if (validTopicsCount.decrementAndGet() == 0) {
                createTopicsAsync.run();
            }
        };
        validTopics.forEach((topic, details) -> {
            KopTopic kopTopic;
            try {
                kopTopic = new KopTopic((String)topic, namespacePrefix);
            }
            catch (KoPTopicException e) {
                completeOneErrorTopic.accept((String)topic, ApiError.fromThrowable((Throwable)e));
                return;
            }
            String fullTopicName = kopTopic.getFullName();
            this.authorize(AclOperation.CREATE, Resource.of(ResourceType.TOPIC, fullTopicName)).whenComplete((isAuthorized, ex) -> {
                if (ex != null) {
                    log.error("CreateTopics authorize failed, topic - {}. {}", (Object)fullTopicName, (Object)ex.getMessage());
                    completeOneErrorTopic.accept((String)topic, new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, ex.getMessage()));
                    return;
                }
                if (!isAuthorized.booleanValue()) {
                    log.error("CreateTopics authorize failed, topic - {}.", (Object)fullTopicName);
                    completeOneErrorTopic.accept((String)topic, new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null));
                    return;
                }
                completeOneTopic.accept((String)topic, (CreateTopicsRequestData.CreatableTopic)details);
            });
        });
    }

    @Override
    protected void handleAlterConfigs(KafkaCommandDecoder.KafkaHeaderAndRequest describeConfigs, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(describeConfigs.getRequest() instanceof AlterConfigsRequest));
        AlterConfigsRequest request = (AlterConfigsRequest)describeConfigs.getRequest();
        if (request.configs().isEmpty()) {
            resultFuture.complete((AbstractResponse)new AlterConfigsResponse(new AlterConfigsResponseData()));
            return;
        }
        AlterConfigsResponseData data = new AlterConfigsResponseData();
        request.data().resources().forEach(resource -> {
            byte resourceType = resource.resourceType();
            String resourceName = resource.resourceName();
            resource.configs().forEach(entry -> log.info("Ignoring ALTER_CONFIG for {} (type {}) {} = {}", new Object[]{resourceName, resourceType, entry.name(), entry.value()}));
            data.responses().add(new AlterConfigsResponseData.AlterConfigsResourceResponse().setErrorCode(ApiError.NONE.error().code()).setErrorMessage(ApiError.NONE.error().message()).setResourceName(resourceName).setResourceType(resourceType));
        });
        resultFuture.complete((AbstractResponse)new AlterConfigsResponse(data));
    }

    @Override
    protected void handleDescribeConfigs(KafkaCommandDecoder.KafkaHeaderAndRequest describeConfigs, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(describeConfigs.getRequest() instanceof DescribeConfigsRequest));
        DescribeConfigsRequest request = (DescribeConfigsRequest)describeConfigs.getRequest();
        DescribeConfigsRequestData data = request.data();
        if (data.resources().isEmpty()) {
            resultFuture.complete((AbstractResponse)new DescribeConfigsResponse(new DescribeConfigsResponseData()));
            return;
        }
        List authorizedResources = Collections.synchronizedList(new ArrayList());
        ConcurrentMap failedConfigResourceMap = Maps.newConcurrentMap();
        AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(data.resources().size());
        String namespacePrefix = this.currentNamespacePrefix();
        Consumer<Runnable> completeOne = action -> {
            action.run();
            if (unfinishedAuthorizationCount.decrementAndGet() == 0) {
                this.adminManager.describeConfigsAsync(authorizedResources.stream().collect(Collectors.toMap(configResource -> configResource, configResource -> data.resources().stream().filter(r -> r.resourceName().equals(configResource.name()) && r.resourceType() == configResource.type().id()).findAny().map(__ -> new HashSet()))), namespacePrefix).thenApply(configResourceConfigMap -> {
                    DescribeConfigsResponseData responseData = new DescribeConfigsResponseData();
                    configResourceConfigMap.putAll(failedConfigResourceMap);
                    configResourceConfigMap.forEach((resource, result) -> responseData.results().add(new DescribeConfigsResponseData.DescribeConfigsResult().setResourceName(resource.name()).setResourceType(resource.type().id()).setErrorCode(result.error().error().code()).setErrorMessage(result.error().messageWithFallback()).setConfigs(result.entries().stream().map(c -> new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(c.name()).setConfigSource(c.source().id()).setReadOnly(c.isReadOnly()).setConfigType(c.type().id()).setValue(c.value()).setDocumentation("")).collect(Collectors.toList()))));
                    resultFuture.complete((AbstractResponse)new DescribeConfigsResponse(responseData));
                    return null;
                });
            }
        };
        data.resources().forEach(configRes -> {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.forId((byte)configRes.resourceType()), configRes.resourceName());
            switch (configResource.type()) {
                case TOPIC: {
                    KopTopic kopTopic;
                    try {
                        kopTopic = new KopTopic(configResource.name(), namespacePrefix);
                    }
                    catch (KoPTopicException e) {
                        completeOne.accept(() -> {
                            ApiError error = new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "Topic " + configResource.name() + " doesn't exist");
                            failedConfigResourceMap.put(configResource, new DescribeConfigsResponse.Config(error, Collections.emptyList()));
                        });
                        return;
                    }
                    String fullTopicName = kopTopic.getFullName();
                    this.authorize(AclOperation.DESCRIBE_CONFIGS, Resource.of(ResourceType.TOPIC, fullTopicName)).whenComplete((isAuthorized, ex) -> {
                        if (ex != null) {
                            log.error("DescribeConfigs in topic authorize failed, topic - {}. {}", (Object)fullTopicName, (Object)ex.getMessage());
                            completeOne.accept(() -> failedConfigResourceMap.put(configResource, new DescribeConfigsResponse.Config(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null), Collections.emptyList())));
                            return;
                        }
                        if (isAuthorized.booleanValue()) {
                            completeOne.accept(() -> authorizedResources.add(configResource));
                            return;
                        }
                        completeOne.accept(() -> failedConfigResourceMap.put(configResource, new DescribeConfigsResponse.Config(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null), Collections.emptyList())));
                    });
                    break;
                }
                case BROKER: {
                    completeOne.accept(() -> authorizedResources.add(configResource));
                    break;
                }
                default: {
                    completeOne.accept(() -> log.error("KoP doesn't support resource type: " + configResource.type()));
                }
            }
        });
    }

    @Override
    protected void handleInitProducerId(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response) {
        InitProducerIdRequest request = (InitProducerIdRequest)kafkaHeaderAndRequest.getRequest();
        InitProducerIdRequestData data = request.data();
        TransactionCoordinator transactionCoordinator = this.getTransactionCoordinator();
        transactionCoordinator.handleInitProducerId(data.transactionalId(), data.transactionTimeoutMs(), Optional.empty(), resp -> {
            InitProducerIdResponseData responseData = new InitProducerIdResponseData().setErrorCode(resp.getError().code()).setProducerId(resp.getProducerId().longValue()).setProducerEpoch(resp.getProducerEpoch().shortValue());
            response.complete((AbstractResponse)new InitProducerIdResponse(responseData));
        });
    }

    @Override
    protected void handleAddPartitionsToTxn(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response) {
        AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest)kafkaHeaderAndRequest.getRequest();
        AddPartitionsToTxnRequestData data = request.data();
        List partitionsToAdd = request.partitions();
        ConcurrentMap unauthorizedTopicErrors = Maps.newConcurrentMap();
        ConcurrentMap nonExistingTopicErrors = Maps.newConcurrentMap();
        Set authorizedPartitions = Sets.newConcurrentHashSet();
        TransactionCoordinator transactionCoordinator = this.getTransactionCoordinator();
        AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(partitionsToAdd.size());
        Consumer<Runnable> completeOne = action -> {
            action.run();
            if (unfinishedAuthorizationCount.decrementAndGet() == 0) {
                if (!unauthorizedTopicErrors.isEmpty() || !nonExistingTopicErrors.isEmpty()) {
                    HashMap partitionErrors = Maps.newHashMap();
                    partitionErrors.putAll(unauthorizedTopicErrors);
                    partitionErrors.putAll(nonExistingTopicErrors);
                    for (TopicPartition topicPartition : authorizedPartitions) {
                        partitionErrors.put(topicPartition, Errors.OPERATION_NOT_ATTEMPTED);
                    }
                    response.complete((AbstractResponse)new AddPartitionsToTxnResponse(0, (Map)partitionErrors));
                } else {
                    transactionCoordinator.handleAddPartitionsToTransaction(data.transactionalId(), data.producerId(), data.producerEpoch(), authorizedPartitions, errors -> {
                        AddPartitionsToTxnResponseData responseData = new AddPartitionsToTxnResponseData();
                        Map<TopicPartition, Errors> topicPartitionErrorsMap = this.addPartitionError(partitionsToAdd, (Errors)errors);
                        topicPartitionErrorsMap.keySet().stream().map(TopicPartition::topic).distinct().forEach(topicName -> {
                            AddPartitionsToTxnResponseData.AddPartitionsToTxnTopicResult topicResult = new AddPartitionsToTxnResponseData.AddPartitionsToTxnTopicResult().setName(topicName);
                            responseData.results().add((ImplicitLinkedHashCollection.Element)topicResult);
                            topicPartitionErrorsMap.forEach((tp, error) -> {
                                if (tp.topic().equals(topicName)) {
                                    topicResult.results().add((ImplicitLinkedHashCollection.Element)new AddPartitionsToTxnResponseData.AddPartitionsToTxnPartitionResult().setPartitionIndex(tp.partition()).setErrorCode(error.code()));
                                }
                            });
                        });
                        response.complete((AbstractResponse)new AddPartitionsToTxnResponse(responseData));
                    });
                }
            }
        };
        String namespacePrefix = this.currentNamespacePrefix();
        partitionsToAdd.forEach(tp -> {
            String fullPartitionName;
            try {
                fullPartitionName = KopTopic.toString(tp, namespacePrefix);
            }
            catch (KoPTopicException e) {
                log.warn("Invalid topic name: {}", (Object)tp.topic(), (Object)e);
                completeOne.accept(() -> nonExistingTopicErrors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION));
                return;
            }
            this.authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, fullPartitionName)).whenComplete((isAuthorized, ex) -> {
                if (ex != null) {
                    log.error("AddPartitionsToTxn topic authorize failed, topic - {}. {}", (Object)fullPartitionName, (Object)ex.getMessage());
                    completeOne.accept(() -> unauthorizedTopicErrors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED));
                    return;
                }
                if (!isAuthorized.booleanValue()) {
                    completeOne.accept(() -> unauthorizedTopicErrors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED));
                    return;
                }
                completeOne.accept(() -> authorizedPartitions.add(tp));
            });
        });
    }

    @Override
    protected void handleAddOffsetsToTxn(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response) {
        AddOffsetsToTxnRequest request = (AddOffsetsToTxnRequest)kafkaHeaderAndRequest.getRequest();
        AddOffsetsToTxnRequestData data = request.data();
        int partition = this.getGroupCoordinator().partitionFor(data.groupId());
        String offsetTopicName = this.getGroupCoordinator().getGroupManager().getOffsetConfig().offsetsTopicName();
        TransactionCoordinator transactionCoordinator = this.getTransactionCoordinator();
        Set<TopicPartition> topicPartitions = Collections.singleton(new TopicPartition(offsetTopicName, partition));
        transactionCoordinator.handleAddPartitionsToTransaction(data.transactionalId(), data.producerId(), data.producerEpoch(), topicPartitions, errors -> {
            AddOffsetsToTxnResponseData responseData = new AddOffsetsToTxnResponseData().setErrorCode(errors.code());
            response.complete((AbstractResponse)new AddOffsetsToTxnResponse(responseData));
        });
    }

    private Map<TopicPartition, Errors> addPartitionError(Collection<TopicPartition> partitions, Errors errors) {
        HashMap result = Maps.newHashMap();
        for (TopicPartition partition : partitions) {
            result.put(partition, errors);
        }
        return result;
    }

    @Override
    protected void handleTxnOffsetCommit(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response) {
        TxnOffsetCommitRequest request = (TxnOffsetCommitRequest)kafkaHeaderAndRequest.getRequest();
        TxnOffsetCommitRequestData data = request.data();
        if (data.topics().isEmpty()) {
            response.complete((AbstractResponse)new TxnOffsetCommitResponse(0, (Map)Maps.newHashMap()));
            return;
        }
        Map<TopicPartition, Errors> nonExistingTopicErrors = this.nonExistingTopicErrors();
        ConcurrentMap unauthorizedTopicErrors = Maps.newConcurrentMap();
        ConcurrentMap convertedOffsetData = Maps.newConcurrentMap();
        HashMap replacingIndex = Maps.newHashMap();
        AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(request.offsets().size());
        Consumer<Runnable> completeOne = action -> {
            action.run();
            if (unfinishedAuthorizationCount.decrementAndGet() == 0) {
                if (log.isTraceEnabled()) {
                    StringBuffer traceInfo = new StringBuffer();
                    replacingIndex.forEach((inner, outer) -> traceInfo.append(String.format("\tinnerName:%s, outerName:%s%n", inner, outer)));
                    log.trace("TXN_OFFSET_COMMIT TopicPartition relations: \n{}", (Object)traceInfo);
                }
                this.getGroupCoordinator().handleTxnCommitOffsets(data.groupId(), data.producerId(), data.producerEpoch(), this.convertTxnOffsets(convertedOffsetData)).whenComplete((resultMap, throwable) -> {
                    this.replaceTopicPartition((Map)resultMap, replacingIndex);
                    resultMap.putAll(nonExistingTopicErrors);
                    resultMap.putAll(unauthorizedTopicErrors);
                    response.complete((AbstractResponse)new TxnOffsetCommitResponse(0, resultMap));
                });
            }
        };
        String namespacePrefix = this.currentNamespacePrefix();
        request.offsets().forEach((tp, commitOffset) -> {
            KopTopic kopTopic;
            try {
                kopTopic = new KopTopic(tp.topic(), namespacePrefix);
            }
            catch (KoPTopicException e) {
                log.warn("Invalid topic name: {}", (Object)tp.topic(), (Object)e);
                completeOne.accept(() -> nonExistingTopicErrors.put((TopicPartition)tp, Errors.UNKNOWN_TOPIC_OR_PARTITION));
                return;
            }
            String fullTopicName = kopTopic.getFullName();
            this.authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)).whenComplete((isAuthorized, ex) -> {
                if (ex != null) {
                    log.error("TxnOffsetCommit authorize failed, topic - {}. {}", (Object)fullTopicName, (Object)ex.getMessage());
                    completeOne.accept(() -> unauthorizedTopicErrors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED));
                    return;
                }
                if (!isAuthorized.booleanValue()) {
                    completeOne.accept(() -> unauthorizedTopicErrors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED));
                    return;
                }
                completeOne.accept(() -> {
                    TopicPartition newTopicPartition = new TopicPartition(fullTopicName, tp.partition());
                    convertedOffsetData.put(newTopicPartition, commitOffset);
                    replacingIndex.put(newTopicPartition, tp);
                });
            });
        });
    }

    private Map<TopicPartition, OffsetAndMetadata> convertTxnOffsets(Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> offsetsMap) {
        long currentTimestamp = SystemTime.SYSTEM.milliseconds();
        HashMap<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (Map.Entry<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> entry : offsetsMap.entrySet()) {
            TxnOffsetCommitRequest.CommittedOffset partitionData = entry.getValue();
            String metadata = KafkaRequestUtils.getMetadata(partitionData);
            long offset = KafkaRequestUtils.getOffset(partitionData);
            offsetAndMetadataMap.put(entry.getKey(), OffsetAndMetadata.apply(offset, metadata, currentTimestamp, -1L));
        }
        return offsetAndMetadataMap;
    }

    @Override
    protected void handleEndTxn(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response) {
        EndTxnRequest request = (EndTxnRequest)kafkaHeaderAndRequest.getRequest();
        EndTxnRequestData data = request.data();
        TransactionCoordinator transactionCoordinator = this.getTransactionCoordinator();
        transactionCoordinator.handleEndTransaction(data.transactionalId(), data.producerId(), data.producerEpoch(), data.committed() ? TransactionResult.COMMIT : TransactionResult.ABORT, errors -> response.complete((AbstractResponse)new EndTxnResponse(new EndTxnResponseData().setErrorCode(errors.code()))));
    }

    @Override
    protected void handleWriteTxnMarkers(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response) {
        WriteTxnMarkersRequest request = (WriteTxnMarkersRequest)kafkaHeaderAndRequest.getRequest();
        ConcurrentHashMap errors = new ConcurrentHashMap();
        List markers = request.markers();
        AtomicInteger numAppends = new AtomicInteger(markers.size());
        if (numAppends.get() == 0) {
            response.complete((AbstractResponse)new WriteTxnMarkersResponse(errors));
            return;
        }
        BiConsumer<Long, Map> updateErrors = (producerId, currentErrors) -> {
            Map previousErrors = errors.putIfAbsent(producerId, currentErrors);
            if (previousErrors != null) {
                previousErrors.putAll(currentErrors);
            }
        };
        Runnable completeOne = () -> {
            if (numAppends.decrementAndGet() == 0) {
                response.complete((AbstractResponse)new WriteTxnMarkersResponse(errors));
            }
        };
        for (WriteTxnMarkersRequest.TxnMarkerEntry marker : markers) {
            long producerId2 = marker.producerId();
            TransactionResult transactionResult = marker.transactionResult();
            Map<TopicPartition, MemoryRecords> controlRecords = this.generateTxnMarkerRecords(marker);
            AppendRecordsContext appendRecordsContext = AppendRecordsContext.get(this.topicManager, this::startSendOperationForThrottling, this::completeSendOperationForThrottling, this.pendingTopicFuturesMap, this.ctx);
            this.getReplicaManager().appendRecords(this.kafkaConfig.getRequestTimeoutMs(), true, this.currentNamespacePrefix(), controlRecords, PartitionLog.AppendOrigin.Coordinator, appendRecordsContext).whenComplete((result, ex) -> {
                appendRecordsContext.recycle();
                if (ex != null) {
                    log.error("[{}] Append txn marker ({}) failed.", new Object[]{this.ctx.channel(), marker, ex});
                    HashMap currentErrors = new HashMap();
                    controlRecords.forEach((topicPartition, partitionResponse) -> currentErrors.put(topicPartition, Errors.KAFKA_STORAGE_ERROR));
                    updateErrors.accept(producerId2, currentErrors);
                    completeOne.run();
                    return;
                }
                HashMap currentErrors = new HashMap();
                result.forEach((topicPartition, partitionResponse) -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Append txn marker to topic : [{}], response: [{}].", new Object[]{this.ctx.channel(), topicPartition, partitionResponse});
                    }
                    currentErrors.put(topicPartition, partitionResponse.error);
                });
                updateErrors.accept(producerId2, currentErrors);
                String metadataNamespace = this.kafkaConfig.getKafkaMetadataNamespace();
                Set successfulOffsetsPartitions = result.keySet().stream().filter(topicPartition -> KopTopic.isGroupMetadataTopicName(topicPartition.topic(), metadataNamespace)).collect(Collectors.toSet());
                if (!successfulOffsetsPartitions.isEmpty()) {
                    this.getGroupCoordinator().scheduleHandleTxnCompletion(producerId2, successfulOffsetsPartitions.stream().map(TopicPartition::partition).collect(Collectors.toSet()), transactionResult).whenComplete((__, e) -> {
                        if (e != null) {
                            log.error("Received an exception while trying to update the offsets cache on transaction marker append", e);
                            ConcurrentHashMap updatedErrors = new ConcurrentHashMap();
                            successfulOffsetsPartitions.forEach(partition -> updatedErrors.put(partition, Errors.forException((Throwable)e.getCause())));
                            updateErrors.accept(producerId2, updatedErrors);
                        }
                        completeOne.run();
                    });
                    return;
                }
                completeOne.run();
            });
        }
    }

    private Map<TopicPartition, MemoryRecords> generateTxnMarkerRecords(WriteTxnMarkersRequest.TxnMarkerEntry marker) {
        HashMap txnMarkerRecordsMap = Maps.newHashMap();
        ControlRecordType controlRecordType = marker.transactionResult().equals((Object)TransactionResult.COMMIT) ? ControlRecordType.COMMIT : ControlRecordType.ABORT;
        EndTransactionMarker endTransactionMarker = new EndTransactionMarker(controlRecordType, marker.coordinatorEpoch());
        for (TopicPartition topicPartition : marker.partitions()) {
            MemoryRecords memoryRecords = MemoryRecords.withEndTransactionMarker((long)marker.producerId(), (short)marker.producerEpoch(), (EndTransactionMarker)endTransactionMarker);
            txnMarkerRecordsMap.put(topicPartition, memoryRecords);
        }
        return txnMarkerRecordsMap;
    }

    @Override
    protected void handleDeleteTopics(KafkaCommandDecoder.KafkaHeaderAndRequest deleteTopics, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(deleteTopics.getRequest() instanceof DeleteTopicsRequest));
        DeleteTopicsRequest request = (DeleteTopicsRequest)deleteTopics.getRequest();
        DeleteTopicsRequestData data = request.data();
        List topicsToDelete = data.topics();
        if (topicsToDelete == null || topicsToDelete.isEmpty()) {
            resultFuture.complete((AbstractResponse)KafkaResponseUtils.newDeleteTopics(Maps.newHashMap()));
            return;
        }
        ConcurrentMap deleteTopicsResponse = Maps.newConcurrentMap();
        AtomicInteger topicToDeleteCount = new AtomicInteger(topicsToDelete.size());
        BiConsumer<String, Errors> completeOne = (topic, errors) -> {
            deleteTopicsResponse.put(topic, errors);
            if (errors == Errors.NONE) {
                this.metadataStore.put(KopEventManager.getDeleteTopicsPath() + "/" + TopicNameUtils.getTopicNameWithUrlEncoded(topic), new byte[0], Optional.empty());
            }
            if (topicToDeleteCount.decrementAndGet() == 0) {
                resultFuture.complete((AbstractResponse)KafkaResponseUtils.newDeleteTopics(deleteTopicsResponse));
            }
        };
        String namespacePrefix = this.currentNamespacePrefix();
        topicsToDelete.forEach(topicState -> {
            KopTopic kopTopic;
            String topic = topicState.name();
            try {
                kopTopic = new KopTopic(topic, namespacePrefix);
            }
            catch (KoPTopicException e) {
                completeOne.accept(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION);
                return;
            }
            String fullTopicName = kopTopic.getFullName();
            this.authorize(AclOperation.DELETE, Resource.of(ResourceType.TOPIC, fullTopicName)).whenComplete((isAuthorize, ex) -> {
                if (ex != null) {
                    log.error("DeleteTopics authorize failed, topic - {}. {}", (Object)fullTopicName, (Object)ex.getMessage());
                    completeOne.accept(topic, Errors.TOPIC_AUTHORIZATION_FAILED);
                    return;
                }
                if (!isAuthorize.booleanValue()) {
                    completeOne.accept(topic, Errors.TOPIC_AUTHORIZATION_FAILED);
                    return;
                }
                this.adminManager.deleteTopic(fullTopicName, __ -> completeOne.accept(topic, Errors.NONE), __ -> completeOne.accept(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION));
            });
        });
    }

    @Override
    protected void handleDeleteRecords(KafkaCommandDecoder.KafkaHeaderAndRequest deleteTopics, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(deleteTopics.getRequest() instanceof DeleteRecordsRequest));
        DeleteRecordsRequest request = (DeleteRecordsRequest)deleteTopics.getRequest();
        HashMap<TopicPartition, Long> partitionOffsets = new HashMap<TopicPartition, Long>();
        request.data().topics().forEach(topic -> {
            String name = topic.name();
            topic.partitions().forEach(partition -> {
                TopicPartition topicPartition = new TopicPartition(name, partition.partitionIndex());
                partitionOffsets.put(topicPartition, partition.offset());
            });
        });
        if (partitionOffsets.isEmpty()) {
            resultFuture.complete((AbstractResponse)KafkaResponseUtils.newDeleteRecords(Maps.newHashMap()));
            return;
        }
        ConcurrentMap deleteRecordsResponse = Maps.newConcurrentMap();
        AtomicInteger topicToDeleteCount = new AtomicInteger(partitionOffsets.size());
        BiConsumer<TopicPartition, Errors> completeOne = (topic, errors) -> {
            deleteRecordsResponse.put(topic, errors);
            if (topicToDeleteCount.decrementAndGet() == 0) {
                resultFuture.complete((AbstractResponse)KafkaResponseUtils.newDeleteRecords(deleteRecordsResponse));
            }
        };
        String namespacePrefix = this.currentNamespacePrefix();
        partitionOffsets.forEach((topicPartition, offset) -> {
            KopTopic kopTopic;
            try {
                kopTopic = new KopTopic(topicPartition.topic(), namespacePrefix);
            }
            catch (KoPTopicException e) {
                completeOne.accept((TopicPartition)topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION);
                return;
            }
            String fullTopicName = kopTopic.getPartitionName(topicPartition.partition());
            this.authorize(AclOperation.DELETE, Resource.of(ResourceType.TOPIC, fullTopicName)).whenComplete((isAuthorize, ex) -> {
                if (ex != null) {
                    log.error("DeleteTopics authorize failed, topic - {}. {}", (Object)fullTopicName, (Object)ex.getMessage());
                    completeOne.accept((TopicPartition)topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED);
                    return;
                }
                if (!isAuthorize.booleanValue()) {
                    completeOne.accept((TopicPartition)topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED);
                    return;
                }
                this.topicManager.getTopicConsumerManager(fullTopicName).thenAccept(topicManager -> topicManager.findPositionForIndex((Long)offset).thenAccept(position -> this.adminManager.truncateTopic(fullTopicName, (long)offset, (Position)position, __ -> completeOne.accept((TopicPartition)topicPartition, Errors.NONE), __ -> completeOne.accept((TopicPartition)topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION))));
            });
        });
    }

    @Override
    protected void handleCreatePartitions(KafkaCommandDecoder.KafkaHeaderAndRequest createPartitions, CompletableFuture<AbstractResponse> resultFuture) {
        Preconditions.checkArgument((boolean)(createPartitions.getRequest() instanceof CreatePartitionsRequest));
        CreatePartitionsRequest request = (CreatePartitionsRequest)createPartitions.getRequest();
        ConcurrentMap result = Maps.newConcurrentMap();
        HashMap validTopics = Maps.newHashMap();
        HashSet duplicateTopics = new HashSet();
        KafkaRequestUtils.forEachCreatePartitionsRequest(request, (topic, newPartition) -> {
            if (duplicateTopics.add(topic)) {
                validTopics.put(topic, newPartition);
            } else {
                String errorMessage = "Create topics partitions request from client `" + createPartitions.getHeader().clientId() + "` contains multiple entries for the following topics: " + duplicateTopics;
                result.put(topic, new ApiError(Errors.INVALID_REQUEST, errorMessage));
            }
        });
        if (validTopics.isEmpty()) {
            resultFuture.complete((AbstractResponse)KafkaResponseUtils.newCreatePartitions(result));
            return;
        }
        String namespacePrefix = this.currentNamespacePrefix();
        AtomicInteger validTopicsCount = new AtomicInteger(validTopics.size());
        ConcurrentMap authorizedTopics = Maps.newConcurrentMap();
        Runnable createPartitionsAsync = () -> {
            if (authorizedTopics.isEmpty()) {
                resultFuture.complete((AbstractResponse)KafkaResponseUtils.newCreatePartitions(result));
                return;
            }
            this.adminManager.createPartitionsAsync(authorizedTopics, request.data().timeoutMs(), namespacePrefix).thenApply(validResult -> {
                result.putAll(validResult);
                resultFuture.complete((AbstractResponse)KafkaResponseUtils.newCreatePartitions(result));
                return null;
            });
        };
        BiConsumer<String, CreatePartitionsRequestData.CreatePartitionsTopic> completeOneTopic = (topic, newPartitions) -> {
            authorizedTopics.put(topic, newPartitions);
            if (validTopicsCount.decrementAndGet() == 0) {
                createPartitionsAsync.run();
            }
        };
        BiConsumer<String, ApiError> completeOneErrorTopic = (topic, error) -> {
            result.put(topic, error);
            if (validTopicsCount.decrementAndGet() == 0) {
                createPartitionsAsync.run();
            }
        };
        validTopics.forEach((topic, newPartitions) -> {
            try {
                KopTopic kopTopic = new KopTopic((String)topic, namespacePrefix);
                String fullTopicName = kopTopic.getFullName();
                this.authorize(AclOperation.ALTER, Resource.of(ResourceType.TOPIC, fullTopicName)).whenComplete((isAuthorized, ex) -> {
                    if (ex != null) {
                        log.error("CreatePartitions authorize failed, topic - {}. {}", (Object)fullTopicName, (Object)ex.getMessage());
                        completeOneErrorTopic.accept((String)topic, new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, ex.getMessage()));
                        return;
                    }
                    if (!isAuthorized.booleanValue()) {
                        completeOneErrorTopic.accept((String)topic, new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null));
                        return;
                    }
                    completeOneTopic.accept((String)topic, (CreatePartitionsRequestData.CreatePartitionsTopic)newPartitions);
                });
            }
            catch (KoPTopicException e) {
                completeOneErrorTopic.accept((String)topic, ApiError.fromThrowable((Throwable)e));
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("Caught error in handler, closing channel", cause);
        this.close();
    }

    public CompletableFuture<MetadataResponse.PartitionMetadata> lookup(TopicName topic) {
        return this.findBroker(topic).thenApply(KafkaResponseUtils.BrokerLookupResult::toPartitionMetadata);
    }

    public CompletableFuture<KafkaResponseUtils.BrokerLookupResult> findBroker(TopicName topic) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Handle Lookup for {}", (Object)this.ctx.channel(), (Object)topic);
        }
        CompletableFuture<KafkaResponseUtils.BrokerLookupResult> future = new CompletableFuture<KafkaResponseUtils.BrokerLookupResult>();
        ((CompletableFuture)this.kopBrokerLookupManager.findBroker(topic.toString(), this.advertisedEndPoint).thenApply(listenerInetSocketAddressOpt -> listenerInetSocketAddressOpt.map(inetSocketAddress -> KafkaRequestHandler.newPartitionMetadata(topic, KafkaRequestHandler.newNode(inetSocketAddress))).orElse(null))).whenComplete((partitionMetadata, e) -> {
            if (e != null || partitionMetadata == null) {
                log.warn("[{}] Request {}: Exception while find Broker metadata", (Object)this.ctx.channel(), e);
                future.complete(KafkaRequestHandler.newFailedPartitionMetadata(topic));
            } else {
                future.complete((KafkaResponseUtils.BrokerLookupResult)partitionMetadata);
            }
        });
        return future;
    }

    static Node newNode(InetSocketAddress address) {
        if (log.isDebugEnabled()) {
            log.debug("Return Broker Node of {}. {}:{}", new Object[]{address, address.getHostString(), address.getPort()});
        }
        return new Node(Murmur3_32Hash.getInstance().makeHash((address.getHostString() + address.getPort()).getBytes(StandardCharsets.UTF_8)), address.getHostString(), address.getPort());
    }

    static KafkaResponseUtils.BrokerLookupResult newPartitionMetadata(TopicName topicName, Node node) {
        int kafkaPartitionIndex;
        int pulsarPartitionIndex = topicName.getPartitionIndex();
        int n = kafkaPartitionIndex = pulsarPartitionIndex == -1 ? 0 : pulsarPartitionIndex;
        if (log.isDebugEnabled()) {
            log.debug("Return PartitionMetadata node: {}, topicName: {}", (Object)node, (Object)topicName);
        }
        TopicPartition topicPartition = new TopicPartition(topicName.toString(), kafkaPartitionIndex);
        return KafkaResponseUtils.newMetadataPartition(topicPartition, node);
    }

    static KafkaResponseUtils.BrokerLookupResult newFailedPartitionMetadata(TopicName topicName) {
        int pulsarPartitionIndex = topicName.getPartitionIndex();
        int kafkaPartitionIndex = pulsarPartitionIndex == -1 ? 0 : pulsarPartitionIndex;
        log.warn("Failed find Broker metadata, create PartitionMetadata with NOT_LEADER_FOR_PARTITION");
        TopicPartition topicPartition = new TopicPartition(topicName.toString(), kafkaPartitionIndex);
        return KafkaResponseUtils.newMetadataPartition(Errors.NOT_LEADER_OR_FOLLOWER, topicPartition);
    }

    private void throwIfTransactionCoordinatorDisabled() {
        if (!this.kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
            throw new IllegalArgumentException("Broker has disabled transaction coordinator, please enable it before using transaction.");
        }
    }

    @VisibleForTesting
    protected CompletableFuture<Boolean> authorize(AclOperation operation, Resource resource) {
        Session session = this.authenticator != null ? this.authenticator.session() : null;
        return this.authorize(operation, resource, session);
    }

    protected CompletableFuture<Boolean> authorize(AclOperation operation, Resource resource, Session session) {
        if (this.authorizer == null) {
            return CompletableFuture.completedFuture(true);
        }
        if (session == null) {
            return CompletableFuture.completedFuture(false);
        }
        CompletableFuture<Boolean> isAuthorizedFuture = null;
        switch (operation) {
            case READ: {
                isAuthorizedFuture = this.authorizer.canConsumeAsync(session.getPrincipal(), resource);
                break;
            }
            case IDEMPOTENT_WRITE: 
            case WRITE: {
                isAuthorizedFuture = this.authorizer.canProduceAsync(session.getPrincipal(), resource);
                break;
            }
            case DESCRIBE: {
                if (resource.getResourceType() == ResourceType.TOPIC) {
                    isAuthorizedFuture = this.authorizer.canLookupAsync(session.getPrincipal(), resource);
                    break;
                }
                if (resource.getResourceType() != ResourceType.NAMESPACE) break;
                isAuthorizedFuture = this.authorizer.canGetTopicList(session.getPrincipal(), resource);
                break;
            }
            case CREATE: {
                isAuthorizedFuture = this.authorizer.canCreateTopicAsync(session.getPrincipal(), resource);
                break;
            }
            case DELETE: {
                isAuthorizedFuture = this.authorizer.canDeleteTopicAsync(session.getPrincipal(), resource);
                break;
            }
            case ALTER: {
                isAuthorizedFuture = this.authorizer.canAlterTopicAsync(session.getPrincipal(), resource);
                break;
            }
            case DESCRIBE_CONFIGS: {
                isAuthorizedFuture = this.authorizer.canManageTenantAsync(session.getPrincipal(), resource);
                break;
            }
            case ANY: {
                if (resource.getResourceType() != ResourceType.TENANT) break;
                isAuthorizedFuture = this.authorizer.canAccessTenantAsync(session.getPrincipal(), resource);
                break;
            }
        }
        if (isAuthorizedFuture == null) {
            return FutureUtil.failedFuture((Throwable)new IllegalStateException("AclOperation [" + operation.name() + "] is not supported."));
        }
        return isAuthorizedFuture;
    }

    private boolean validateTenantAccessForSession(Session session) throws AuthenticationException {
        if (!this.kafkaConfig.isKafkaEnableMultiTenantMetadata()) {
            return true;
        }
        String tenantSpec = session.getPrincipal().getTenantSpec();
        if (tenantSpec == null) {
            return true;
        }
        String currentTenant = KafkaRequestHandler.extractTenantFromTenantSpec(tenantSpec);
        try {
            Boolean granted = this.authorize(AclOperation.ANY, Resource.of(ResourceType.TENANT, currentTenant), session).get();
            return granted != null && granted != false;
        }
        catch (InterruptedException | ExecutionException err) {
            log.error("Internal error while verifying tenant access", (Throwable)err);
            throw new AuthenticationException("Internal error while verifying tenant access:" + err, (Throwable)err);
        }
    }

    public Executor getDecodeExecutor() {
        return this.executor;
    }

    public PulsarService getPulsarService() {
        return this.pulsarService;
    }

    public KafkaTopicManager getTopicManager() {
        return this.topicManager;
    }

    public TenantContextManager getTenantContextManager() {
        return this.tenantContextManager;
    }

    public ReplicaManager getReplicaManager() {
        return this.replicaManager;
    }

    public KopBrokerLookupManager getKopBrokerLookupManager() {
        return this.kopBrokerLookupManager;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public PulsarAdmin getAdmin() {
        return this.admin;
    }

    public MetadataStoreExtended getMetadataStore() {
        return this.metadataStore;
    }

    public SaslAuthenticator getAuthenticator() {
        return this.authenticator;
    }

    public Authorizer getAuthorizer() {
        return this.authorizer;
    }

    public AdminManager getAdminManager() {
        return this.adminManager;
    }

    public Boolean getTlsEnabled() {
        return this.tlsEnabled;
    }

    public EndPoint getAdvertisedEndPoint() {
        return this.advertisedEndPoint;
    }

    public boolean isSkipMessagesWithoutIndex() {
        return this.skipMessagesWithoutIndex;
    }

    public int getDefaultNumPartitions() {
        return this.defaultNumPartitions;
    }

    public int getMaxReadEntriesNum() {
        return this.maxReadEntriesNum;
    }

    public int getFailedAuthenticationDelayMs() {
        return this.failedAuthenticationDelayMs;
    }

    public ConcurrentHashMap<String, CompletableFuture<String>> getCurrentConnectedGroup() {
        return this.currentConnectedGroup;
    }

    public ConcurrentSkipListSet<String> getCurrentConnectedClientId() {
        return this.currentConnectedClientId;
    }

    public String getGroupIdStoredPath() {
        return this.groupIdStoredPath;
    }

    public Set<String> getGroupIds() {
        return this.groupIds;
    }

    public Map<TopicPartition, PendingTopicFutures> getPendingTopicFuturesMap() {
        return this.pendingTopicFuturesMap;
    }

    public DelayedOperationPurgatory<DelayedOperation> getProducePurgatory() {
        return this.producePurgatory;
    }

    public DelayedOperationPurgatory<DelayedOperation> getFetchPurgatory() {
        return this.fetchPurgatory;
    }

    public long getMaxPendingBytes() {
        return this.maxPendingBytes;
    }

    public long getResumeThresholdPendingBytes() {
        return this.resumeThresholdPendingBytes;
    }

    public AtomicLong getPendingBytes() {
        return this.pendingBytes;
    }

    public boolean isAutoReadDisabledPublishBufferLimiting() {
        return this.autoReadDisabledPublishBufferLimiting;
    }

    public LookupClient getLookupClient() {
        return this.lookupClient;
    }

    public KafkaTopicManagerSharedState getKafkaTopicManagerSharedState() {
        return this.kafkaTopicManagerSharedState;
    }
}

