/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import io.prometheus.client.Gauge;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.lookup.TopicLookupBase;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConnectionController;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PulsarCommandSender;
import org.apache.pulsar.broker.service.PulsarCommandSenderImpl;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandConsumerStats;
import org.apache.pulsar.common.api.proto.CommandEndTxn;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartition;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscription;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.api.proto.CommandGetLastMessageId;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandNewTxn;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
import org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerCnx
extends PulsarHandler
implements TransportCnx {
    private final BrokerService service;
    private final SchemaRegistryService schemaService;
    private final String listenerName;
    private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
    private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
    private final boolean enableSubscriptionPatternEvaluation;
    private final int maxSubscriptionPatternLength;
    private State state;
    private volatile boolean isActive = true;
    String authRole = null;
    private volatile AuthenticationDataSource authenticationData;
    AuthenticationProvider authenticationProvider;
    AuthenticationState authState;
    AuthenticationState originalAuthState;
    AuthenticationDataSource originalAuthData;
    private boolean pendingAuthChallengeResponse = false;
    private final int maxPendingSendRequests;
    private final int resumeReadsThreshold;
    private int pendingSendRequest = 0;
    private final String replicatorPrefix;
    private String clientVersion = null;
    private int nonPersistentPendingMessages = 0;
    private final int maxNonPersistentPendingMessages;
    private String originalPrincipal = null;
    private Set<String> proxyRoles;
    private boolean authenticateOriginalAuthData;
    private final boolean schemaValidationEnforced;
    private String authMethod = "none";
    private final int maxMessageSize;
    private boolean preciseDispatcherFlowControl;
    private boolean preciseTopicPublishRateLimitingEnable;
    private boolean encryptionRequireOnProducer;
    private volatile boolean autoReadDisabledRateLimiting = false;
    private FeatureFlags features;
    private PulsarCommandSender commandSender;
    private final ConnectionController connectionController;
    private static final KeySharedMeta emptyKeySharedMeta = new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT);
    private boolean autoReadDisabledPublishBufferLimiting = false;
    private final long maxPendingBytesPerThread;
    private final long resumeThresholdPendingBytesPerThread;
    private static final FastThreadLocal<MutableLong> pendingBytesPerThread = new FastThreadLocal<MutableLong>(){

        protected MutableLong initialValue() throws Exception {
            return new MutableLong();
        }
    };
    private static final FastThreadLocal<Set<ServerCnx>> cnxsPerThread = new FastThreadLocal<Set<ServerCnx>>(){

        protected Set<ServerCnx> initialValue() throws Exception {
            return Collections.newSetFromMap(new IdentityHashMap());
        }
    };
    private static final byte[] emptyArray = new byte[0];
    private static final Gauge throttledConnections = (Gauge)((Gauge.Builder)((Gauge.Builder)Gauge.build().name("pulsar_broker_throttled_connections")).help("Counter of connections throttled because of per-connection limit")).register();
    private static final Gauge throttledConnectionsGlobal = (Gauge)((Gauge.Builder)((Gauge.Builder)Gauge.build().name("pulsar_broker_throttled_connections_global_limit")).help("Counter of connections throttled because of per-connection limit")).register();
    private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);

    public ServerCnx(PulsarService pulsar) {
        this(pulsar, null);
    }

    public ServerCnx(PulsarService pulsar, String listenerName) {
        super(pulsar.getBrokerService() != null ? pulsar.getBrokerService().getKeepAliveIntervalSeconds() : 0, TimeUnit.SECONDS);
        this.service = pulsar.getBrokerService();
        this.schemaService = pulsar.getSchemaRegistryService();
        this.listenerName = listenerName;
        this.state = State.Start;
        ServiceConfiguration conf = pulsar.getConfiguration();
        this.producers = ConcurrentLongHashMap.newBuilder().expectedItems(8).concurrencyLevel(1).build();
        this.consumers = ConcurrentLongHashMap.newBuilder().expectedItems(8).concurrencyLevel(1).build();
        this.replicatorPrefix = conf.getReplicatorPrefix();
        this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
        this.proxyRoles = conf.getProxyRoles();
        this.authenticateOriginalAuthData = conf.isAuthenticateOriginalAuthData();
        this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
        this.maxMessageSize = conf.getMaxMessageSize();
        this.maxPendingSendRequests = conf.getMaxPendingPublishRequestsPerConnection();
        this.resumeReadsThreshold = this.maxPendingSendRequests / 2;
        this.preciseDispatcherFlowControl = conf.isPreciseDispatcherFlowControl();
        this.preciseTopicPublishRateLimitingEnable = conf.isPreciseTopicPublishRateLimiterEnable();
        this.encryptionRequireOnProducer = conf.isEncryptionRequireOnProducer();
        this.maxPendingBytesPerThread = (long)conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L / (long)conf.getNumIOThreads();
        this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2L;
        this.connectionController = new ConnectionController.DefaultConnectionController(conf);
        this.enableSubscriptionPatternEvaluation = conf.isEnableBrokerSideSubscriptionPatternEvaluation();
        this.maxSubscriptionPatternLength = conf.getSubscriptionPatternMaxLength();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ConnectionController.Sate sate = this.connectionController.increaseConnection(this.remoteAddress);
        if (!sate.equals((Object)ConnectionController.Sate.OK)) {
            ctx.channel().writeAndFlush((Object)Commands.newError((long)-1L, (ServerError)ServerError.NotAllowedError, (String)(sate.equals((Object)ConnectionController.Sate.REACH_MAX_CONNECTION) ? "Reached the maximum number of connections" : "Reached the maximum number of connections on address" + this.remoteAddress)));
            ctx.channel().close();
            return;
        }
        log.info("New connection from {}", (Object)this.remoteAddress);
        this.ctx = ctx;
        this.commandSender = new PulsarCommandSenderImpl(this.getBrokerService().getInterceptor(), this);
        this.service.getPulsarStats().recordConnectionCreate();
        ((Set)cnxsPerThread.get()).add(this);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        this.connectionController.decreaseConnection(ctx.channel().remoteAddress());
        this.isActive = false;
        log.info("Closed connection from {}", (Object)this.remoteAddress);
        BrokerInterceptor brokerInterceptor = this.getBrokerService().getInterceptor();
        if (brokerInterceptor != null) {
            brokerInterceptor.onConnectionClosed(this);
        }
        ((Set)cnxsPerThread.get()).remove(this);
        this.producers.forEach((__, producerFuture) -> {
            if (!producerFuture.isDone() && producerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) {
                return;
            }
            if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
                Producer producer = producerFuture.getNow(null);
                producer.closeNow(true);
            }
        });
        this.consumers.forEach((__, consumerFuture) -> {
            if (!consumerFuture.isDone() && consumerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) {
                return;
            }
            if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
                Consumer consumer = consumerFuture.getNow(null);
                try {
                    consumer.close();
                }
                catch (BrokerServiceException e) {
                    log.warn("Consumer {} was already closed: {}", (Object)consumer, (Object)e);
                }
            }
        });
        this.service.getPulsarStats().recordConnectionClose();
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", (Object)ctx.channel().isWritable());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (this.state != State.Failed) {
            log.warn("[{}] Got exception {}", (Object)this.remoteAddress, ClientCnx.isKnownException((Throwable)cause) ? cause : ExceptionUtils.getStackTrace((Throwable)cause));
            this.state = State.Failed;
            if (log.isDebugEnabled()) {
                log.debug("[{}] connect state change to : [{}]", (Object)this.remoteAddress, (Object)State.Failed.name());
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got exception: {}", (Object)this.remoteAddress, (Object)cause);
        }
        ctx.close();
    }

    private boolean invalidOriginalPrincipal(String originalPrincipal) {
        return this.service.isAuthenticationEnabled() && this.service.isAuthorizationEnabled() && this.proxyRoles.contains(this.authRole) && (StringUtils.isBlank((CharSequence)originalPrincipal) || this.proxyRoles.contains(originalPrincipal));
    }

    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation) {
        if (!this.service.isAuthorizationEnabled()) {
            return CompletableFuture.completedFuture(true);
        }
        CompletableFuture isProxyAuthorizedFuture = this.originalPrincipal != null ? this.service.getAuthorizationService().allowTopicOperationAsync(topicName, operation, this.originalPrincipal, this.getAuthenticationData()) : CompletableFuture.completedFuture(true);
        CompletableFuture isAuthorizedFuture = this.service.getAuthorizationService().allowTopicOperationAsync(topicName, operation, this.authRole, this.authenticationData);
        return isProxyAuthorizedFuture.thenCombine((CompletionStage)isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
            if (!isProxyAuthorized.booleanValue()) {
                log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}", new Object[]{this.originalPrincipal, operation, topicName});
            }
            if (!isAuthorized.booleanValue()) {
                log.warn("Role {} is not authorized to perform operation {} on topic {}", new Object[]{this.authRole, operation, topicName});
            }
            return isProxyAuthorized != false && isAuthorized != false;
        });
    }

    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName, TopicOperation operation) {
        if (this.service.isAuthorizationEnabled()) {
            if (this.authenticationData == null) {
                this.authenticationData = new AuthenticationDataCommand("", subscriptionName);
            } else {
                this.authenticationData.setSubscription(subscriptionName);
            }
            if (this.originalAuthData != null) {
                this.originalAuthData.setSubscription(subscriptionName);
            }
            return this.isTopicOperationAllowed(topicName, operation);
        }
        return CompletableFuture.completedFuture(true);
    }

    protected void handleLookup(CommandLookupTopic lookup) {
        TopicName topicName;
        String advertisedListenerName;
        long requestId = lookup.getRequestId();
        boolean authoritative = lookup.isAuthoritative();
        String string = advertisedListenerName = lookup.hasAdvertisedListenerName() && StringUtils.isNotBlank((CharSequence)lookup.getAdvertisedListenerName()) ? lookup.getAdvertisedListenerName() : this.listenerName;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Lookup from {} for {} requesting listener {}", new Object[]{lookup.getTopic(), this.remoteAddress, requestId, StringUtils.isNotBlank((CharSequence)advertisedListenerName) ? advertisedListenerName : "(none)"});
        }
        if ((topicName = this.validateTopicName(lookup.getTopic(), requestId, lookup)) == null) {
            return;
        }
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for lookup ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for lookup ", this.authRole, this.originalPrincipal, topicName});
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.AuthorizationError, (String)"Valid Proxy Client role should be provided for lookup ", (long)requestId));
                lookupSemaphore.release();
                return;
            }
            ((CompletableFuture)this.isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
                if (isAuthorized.booleanValue()) {
                    TopicLookupBase.lookupTopicAsync(this.getBrokerService().pulsar(), topicName, authoritative, this.getPrincipal(), this.getAuthenticationData(), requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
                        if (ex == null) {
                            this.ctx.writeAndFlush(lookupResponse);
                        } else {
                            log.warn("[{}] lookup failed with error {}, {}", new Object[]{this.remoteAddress, topicName, ex.getMessage(), ex});
                            this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.ServiceNotReady, (String)ex.getMessage(), (long)requestId));
                        }
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to Lookup";
                    log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Lookup", this.getPrincipal(), topicName});
                    this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.AuthorizationError, (String)"Proxy Client is not authorized to Lookup", (long)requestId));
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                ServerCnx.logAuthException(this.remoteAddress, "lookup", this.getPrincipal(), Optional.of(topicName), ex);
                String msg = "Exception occurred while trying to authorize lookup";
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.AuthorizationError, (String)"Exception occurred while trying to authorize lookup", (long)requestId));
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)topicName);
            }
            this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.TooManyRequests, (String)"Failed due to too many pending lookup requests", (long)requestId));
        }
    }

    protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
        TopicName topicName;
        long requestId = partitionMetadata.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", new Object[]{partitionMetadata.getTopic(), this.remoteAddress, requestId});
        }
        if ((topicName = this.validateTopicName(partitionMetadata.getTopic(), requestId, partitionMetadata)) == null) {
            return;
        }
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", this.authRole, this.originalPrincipal, topicName});
                this.commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", requestId);
                lookupSemaphore.release();
                return;
            }
            ((CompletableFuture)this.isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
                if (isAuthorized.booleanValue()) {
                    PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync(this.getBrokerService().pulsar(), topicName).handle((metadata, ex) -> {
                        if (ex == null) {
                            int partitions = metadata.partitions;
                            this.commandSender.sendPartitionMetadataResponse(partitions, requestId);
                        } else if (ex instanceof PulsarClientException) {
                            log.warn("Failed to authorize {} at [{}] on topic {} : {}", new Object[]{this.getRole(), this.remoteAddress, topicName, ex.getMessage()});
                            this.commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, ex.getMessage(), requestId);
                        } else {
                            log.warn("Failed to get Partitioned Metadata [{}] {}: {}", new Object[]{this.remoteAddress, topicName, ex.getMessage(), ex});
                            ServerError error = ex instanceof RestException && ((RestException)((Object)((Object)((Object)ex)))).getResponse().getStatus() < 500 ? ServerError.MetadataError : ServerError.ServiceNotReady;
                            this.commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), requestId);
                        }
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to Get Partition Metadata";
                    log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Get Partition Metadata", this.getPrincipal(), topicName});
                    this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((ServerError)ServerError.AuthorizationError, (String)"Proxy Client is not authorized to Get Partition Metadata", (long)requestId));
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                ServerCnx.logAuthException(this.remoteAddress, "partition-metadata", this.getPrincipal(), Optional.of(topicName), ex);
                String msg = "Exception occurred while trying to authorize get Partition Metadata";
                this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((ServerError)ServerError.AuthorizationError, (String)"Exception occurred while trying to authorize get Partition Metadata", (long)requestId));
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)topicName);
            }
            this.commandSender.sendPartitionMetadataResponse(ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId);
        }
    }

    protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
        if (log.isDebugEnabled()) {
            log.debug("Received CommandConsumerStats call from {}", (Object)this.remoteAddress);
        }
        long requestId = commandConsumerStats.getRequestId();
        long consumerId = commandConsumerStats.getConsumerId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(consumerId);
        Consumer consumer = consumerFuture.getNow(null);
        ByteBuf msg = null;
        if (consumer == null) {
            log.error("Failed to get consumer-stats response - Consumer not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, consumerId = {}]", new Object[]{this.remoteAddress, requestId, consumerId});
            msg = Commands.newConsumerStatsResponse((ServerError)ServerError.ConsumerNotFound, (String)("Consumer " + consumerId + " not found"), (long)requestId);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", (Object)requestId, (Object)consumer);
            }
            msg = this.createConsumerStatsResponse(consumer, requestId);
        }
        this.ctx.writeAndFlush((Object)msg);
    }

    ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {
        ConsumerStatsImpl consumerStats = consumer.getStats();
        Subscription subscription = consumer.getSubscription();
        BaseCommand cmd = Commands.newConsumerStatsResponseCommand((ServerError)ServerError.UnknownError, null, (long)requestId);
        cmd.getConsumerStatsResponse().clearErrorCode().setRequestId(requestId).setMsgRateOut(consumerStats.msgRateOut).setMsgThroughputOut(consumerStats.msgThroughputOut).setMsgRateRedeliver(consumerStats.msgRateRedeliver).setConsumerName(consumerStats.consumerName).setAvailablePermits((long)consumerStats.availablePermits).setUnackedMessages((long)consumerStats.unackedMessages).setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs).setAddress(consumerStats.getAddress()).setConnectedSince(consumerStats.getConnectedSince()).setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false)).setMsgRateExpired(subscription.getExpiredMessageRate()).setType(subscription.getTypeString());
        return Commands.serializeWithSize((BaseCommand)cmd);
    }

    private void completeConnect(int clientProtoVersion, String clientVersion) {
        BrokerInterceptor brokerInterceptor;
        this.ctx.writeAndFlush((Object)Commands.newConnected((int)clientProtoVersion, (int)this.maxMessageSize));
        this.state = State.Connected;
        this.service.getPulsarStats().recordConnectionCreateSuccess();
        if (log.isDebugEnabled()) {
            log.debug("[{}] connect state change to : [{}]", (Object)this.remoteAddress, (Object)State.Connected.name());
        }
        this.setRemoteEndpointProtocolVersion(clientProtoVersion);
        if (StringUtils.isNotBlank((CharSequence)clientVersion) && !clientVersion.contains(" ")) {
            this.clientVersion = clientVersion.intern();
        }
        if ((brokerInterceptor = this.getBrokerService().getInterceptor()) != null) {
            brokerInterceptor.onConnectionCreated(this);
        }
    }

    private State doAuthentication(AuthData clientData, int clientProtocolVersion, String clientVersion) throws Exception {
        boolean useOriginalAuthState = this.originalAuthState != null;
        AuthenticationState authState = useOriginalAuthState ? this.originalAuthState : this.authState;
        String authRole = useOriginalAuthState ? this.originalPrincipal : this.authRole;
        AuthData brokerData = authState.authenticate(clientData);
        if (log.isDebugEnabled()) {
            log.debug("Authenticate using original auth state : {}, role = {}", (Object)useOriginalAuthState, (Object)authRole);
        }
        if (authState.isComplete()) {
            String newAuthRole = authState.getAuthRole();
            this.authenticationData = authState.getAuthDataSource();
            if (log.isDebugEnabled()) {
                log.debug("[{}] Auth data refreshed for role={}", (Object)this.remoteAddress, (Object)this.authRole);
            }
            if (!useOriginalAuthState) {
                this.authRole = newAuthRole;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", new Object[]{this.remoteAddress, this.authMethod, this.authRole, this.originalPrincipal});
            }
            if (this.state != State.Connected) {
                this.completeConnect(clientProtocolVersion, clientVersion);
            } else if (!StringUtils.isEmpty((CharSequence)authRole)) {
                if (!authRole.equals(newAuthRole)) {
                    log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}", new Object[]{this.remoteAddress, authRole, newAuthRole});
                    this.ctx.close();
                } else {
                    log.info("[{}] Refreshed authentication credentials for role {}", (Object)this.remoteAddress, (Object)authRole);
                }
            }
            return State.Connected;
        }
        this.ctx.writeAndFlush((Object)Commands.newAuthChallenge((String)this.authMethod, (AuthData)brokerData, (int)clientProtocolVersion));
        if (log.isDebugEnabled()) {
            log.debug("[{}] Authentication in progress client by method {}.", (Object)this.remoteAddress, (Object)this.authMethod);
            log.debug("[{}] connect state change to : [{}]", (Object)this.remoteAddress, (Object)State.Connecting.name());
        }
        return State.Connecting;
    }

    public void refreshAuthenticationCredentials() {
        AuthenticationState authState;
        AuthenticationState authenticationState = authState = this.originalAuthState != null ? this.originalAuthState : this.authState;
        if (authState == null) {
            return;
        }
        if (this.getState() != State.Connected || !this.isActive) {
            return;
        }
        if (!authState.isExpired()) {
            return;
        }
        if (this.originalPrincipal != null && this.originalAuthState == null) {
            log.info("[{}] Cannot revalidate user credential when using proxy and not forwarding the credentials. Closing connection", (Object)this.remoteAddress);
            return;
        }
        this.ctx.executor().execute((Runnable)SafeRun.safeRun(() -> {
            log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", new Object[]{this.remoteAddress, this.originalPrincipal, this.authRole});
            if (!this.supportsAuthenticationRefresh()) {
                log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", (Object)this.remoteAddress);
                this.ctx.close();
                return;
            }
            if (this.pendingAuthChallengeResponse) {
                log.warn("[{}] Closing connection after timeout on refreshing auth credentials", (Object)this.remoteAddress);
                this.ctx.close();
                return;
            }
            try {
                AuthData brokerData = authState.refreshAuthentication();
                this.ctx.writeAndFlush((Object)Commands.newAuthChallenge((String)this.authMethod, (AuthData)brokerData, (int)this.getRemoteEndpointProtocolVersion()));
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", (Object)this.remoteAddress, (Object)this.authMethod);
                }
                this.pendingAuthChallengeResponse = true;
            }
            catch (AuthenticationException e) {
                log.warn("[{}] Failed to refresh authentication: {}", (Object)this.remoteAddress, (Object)e);
                this.ctx.close();
            }
        }));
    }

    protected void handleConnect(CommandConnect connect) {
        Preconditions.checkArgument((this.state == State.Start ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("Received CONNECT from {}, auth enabled: {}: has original principal = {}, original principal = {}", new Object[]{this.remoteAddress, this.service.isAuthenticationEnabled(), connect.hasOriginalPrincipal(), connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null});
        }
        String clientVersion = connect.getClientVersion();
        int clientProtocolVersion = connect.getProtocolVersion();
        this.features = new FeatureFlags();
        if (connect.hasFeatureFlags()) {
            this.features.copyFrom(connect.getFeatureFlags());
        }
        if (!this.service.isAuthenticationEnabled()) {
            this.completeConnect(clientProtocolVersion, clientVersion);
            return;
        }
        try {
            byte[] authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray;
            AuthData clientData = AuthData.of((byte[])authData);
            this.authMethod = connect.hasAuthMethodName() ? connect.getAuthMethodName() : (connect.hasAuthMethod() ? connect.getAuthMethod().name().substring(10).toLowerCase() : "none");
            this.authenticationProvider = this.getBrokerService().getAuthenticationService().getAuthenticationProvider(this.authMethod);
            if (this.authenticationProvider == null) {
                this.authRole = (String)this.getBrokerService().getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured"));
                this.completeConnect(clientProtocolVersion, clientVersion);
                return;
            }
            ChannelHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
            }
            this.authState = this.authenticationProvider.newAuthState(clientData, this.remoteAddress, sslSession);
            if (log.isDebugEnabled()) {
                String role = "";
                role = this.authState != null && this.authState.isComplete() ? this.authState.getAuthRole() : "authentication incomplete or null";
                log.debug("[{}] Authenticate role : {}", (Object)this.remoteAddress, (Object)role);
            }
            this.state = this.doAuthentication(clientData, clientProtocolVersion, clientVersion);
            if (connect.hasOriginalPrincipal() && this.service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
                String originalAuthMethod = connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : "none";
                AuthenticationProvider originalAuthenticationProvider = this.getBrokerService().getAuthenticationService().getAuthenticationProvider(originalAuthMethod);
                if (originalAuthenticationProvider == null) {
                    throw new AuthenticationException(String.format("Can't find AuthenticationProvider for original role using auth method [%s] is not available", originalAuthMethod));
                }
                this.originalAuthState = originalAuthenticationProvider.newAuthState(AuthData.of((byte[])connect.getOriginalAuthData().getBytes()), this.remoteAddress, sslSession);
                this.originalAuthData = this.originalAuthState.getAuthDataSource();
                this.originalPrincipal = this.originalAuthState.getAuthRole();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Authenticate original role : {}", (Object)this.remoteAddress, (Object)this.originalPrincipal);
                }
            } else {
                String string = this.originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Authenticate original role (forwarded from proxy): {}", (Object)this.remoteAddress, (Object)this.originalPrincipal);
                }
            }
        }
        catch (Exception e) {
            this.service.getPulsarStats().recordConnectionCreateFail();
            ServerCnx.logAuthException(this.remoteAddress, "connect", this.getPrincipal(), Optional.empty(), e);
            String msg = "Unable to authenticate";
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (ServerError)ServerError.AuthenticationError, (String)msg));
            this.close();
        }
    }

    protected void handleAuthResponse(CommandAuthResponse authResponse) {
        Preconditions.checkArgument((boolean)authResponse.hasResponse());
        Preconditions.checkArgument((authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName() ? 1 : 0) != 0);
        this.pendingAuthChallengeResponse = false;
        if (log.isDebugEnabled()) {
            log.debug("Received AuthResponse from {}, auth method: {}", (Object)this.remoteAddress, (Object)authResponse.getResponse().getAuthMethodName());
        }
        try {
            AuthData clientData = AuthData.of((byte[])authResponse.getResponse().getAuthData());
            this.doAuthentication(clientData, authResponse.getProtocolVersion(), authResponse.getClientVersion());
        }
        catch (AuthenticationException e) {
            this.service.getPulsarStats().recordConnectionCreateFail();
            log.warn("[{}] Authentication failed: {} ", (Object)this.remoteAddress, (Object)e.getMessage());
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (ServerError)ServerError.AuthenticationError, (String)e.getMessage()));
            this.close();
        }
        catch (Exception e) {
            this.service.getPulsarStats().recordConnectionCreateFail();
            String msg = "Unable to handleAuthResponse";
            log.warn("[{}] {} ", new Object[]{this.remoteAddress, msg, e});
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (ServerError)ServerError.UnknownError, (String)msg));
            this.close();
        }
    }

    protected void handleSubscribe(CommandSubscribe subscribe) {
        KeySharedMeta keySharedMeta;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long requestId = subscribe.getRequestId();
        long consumerId = subscribe.getConsumerId();
        TopicName topicName = this.validateTopicName(subscribe.getTopic(), requestId, subscribe);
        if (topicName == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}", new Object[]{this.remoteAddress, this.authRole, this.originalPrincipal});
        }
        if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
            String msg = "Valid Proxy Client role should be provided while subscribing ";
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while subscribing ", this.authRole, this.originalPrincipal, topicName});
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Valid Proxy Client role should be provided while subscribing ");
            return;
        }
        String subscriptionName = subscribe.getSubscription();
        CommandSubscribe.SubType subType = subscribe.getSubType();
        String consumerName = subscribe.hasConsumerName() ? subscribe.getConsumerName() : "";
        boolean isDurable = subscribe.isDurable();
        BatchMessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) : null;
        int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
        boolean readCompacted = subscribe.hasReadCompacted() && subscribe.isReadCompacted();
        Map metadata = CommandUtils.metadataFromCommand((CommandSubscribe)subscribe);
        CommandSubscribe.InitialPosition initialPosition = subscribe.getInitialPosition();
        long startMessageRollbackDurationSec = subscribe.hasStartMessageRollbackDurationSec() ? subscribe.getStartMessageRollbackDurationSec() : -1L;
        SchemaData schema = subscribe.hasSchema() ? this.getSchema(subscribe.getSchema()) : null;
        boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.isReplicateSubscriptionState();
        boolean forceTopicCreation = subscribe.isForceTopicCreation();
        KeySharedMeta keySharedMeta2 = keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) : emptyKeySharedMeta;
        if (log.isDebugEnabled()) {
            log.debug("Topic name = {}, subscription name = {}, schema is {}", new Object[]{topicName, subscriptionName, schema == null ? "absent" : "present"});
        }
        CompletableFuture<Boolean> isAuthorizedFuture = this.isTopicOperationAllowed(topicName, subscriptionName, TopicOperation.CONSUME);
        CompletableFuture consumerFuture = new CompletableFuture();
        CompletableFuture existingConsumerFuture = (CompletableFuture)this.consumers.putIfAbsent(consumerId, consumerFuture);
        ((CompletableFuture)isAuthorizedFuture.thenApply(arg_0 -> this.lambda$handleSubscribe$15(topicName, subscriptionName, metadata, consumerId, consumerFuture, requestId, existingConsumerFuture, forceTopicCreation, isDurable, subscribe, subType, priorityLevel, consumerName, (MessageIdImpl)startMessageId, readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta, schema, arg_0))).exceptionally(ex -> {
            ServerCnx.logAuthException(this.remoteAddress, "subscribe", this.getPrincipal(), Optional.of(topicName), ex);
            this.consumers.remove(consumerId, (Object)consumerFuture);
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
            return null;
        });
    }

    private SchemaData getSchema(Schema protocolSchema) {
        return SchemaData.builder().data(protocolSchema.getSchemaData()).isDeleted(false).timestamp(System.currentTimeMillis()).user(Strings.nullToEmpty((String)this.originalPrincipal)).type(Commands.getSchemaType((Schema.Type)protocolSchema.getType())).props(protocolSchema.getPropertiesList().stream().collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue))).build();
    }

    protected void handleProducer(CommandProducer cmdProducer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long producerId = cmdProducer.getProducerId();
        long requestId = cmdProducer.getRequestId();
        String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() : this.service.generateUniqueProducerName();
        long epoch = cmdProducer.getEpoch();
        boolean userProvidedProducerName = cmdProducer.isUserProvidedProducerName();
        boolean isEncrypted = cmdProducer.isEncrypted();
        Map metadata = CommandUtils.metadataFromCommand((CommandProducer)cmdProducer);
        SchemaData schema = cmdProducer.hasSchema() ? this.getSchema(cmdProducer.getSchema()) : null;
        ProducerAccessMode producerAccessMode = cmdProducer.getProducerAccessMode();
        Optional topicEpoch = cmdProducer.hasTopicEpoch() ? Optional.of(cmdProducer.getTopicEpoch()) : Optional.empty();
        boolean isTxnEnabled = cmdProducer.isTxnEnabled();
        String initialSubscriptionName = cmdProducer.hasInitialSubscriptionName() ? cmdProducer.getInitialSubscriptionName() : null;
        boolean supportsPartialProducer = this.supportsPartialProducer();
        TopicName topicName = this.validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
        if (topicName == null) {
            return;
        }
        if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
            String msg = "Valid Proxy Client role should be provided while creating producer ";
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while creating producer ", this.authRole, this.originalPrincipal, topicName});
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Valid Proxy Client role should be provided while creating producer ");
            return;
        }
        CompletionStage<Boolean> isAuthorizedFuture = this.isTopicOperationAllowed(topicName, TopicOperation.PRODUCE);
        if (!Strings.isNullOrEmpty((String)initialSubscriptionName)) {
            isAuthorizedFuture = isAuthorizedFuture.thenCombine(this.isTopicOperationAllowed(topicName, TopicOperation.SUBSCRIBE), (canProduce, canSubscribe) -> canProduce != false && canSubscribe != false);
        }
        ((CompletableFuture)isAuthorizedFuture.thenApply(isAuthorized -> {
            CompletableFuture producerFuture;
            CompletableFuture existingProducerFuture;
            if (!isAuthorized.booleanValue()) {
                String msg = "Client is not authorized to Produce";
                log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.getPrincipal()});
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (ServerError)ServerError.AuthorizationError, (String)msg));
                return null;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client is authorized to Produce with role {}", (Object)this.remoteAddress, (Object)this.getPrincipal());
            }
            if ((existingProducerFuture = (CompletableFuture)this.producers.putIfAbsent(producerId, producerFuture = new CompletableFuture())) != null) {
                if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
                    Producer producer = existingProducerFuture.getNow(null);
                    log.info("[{}] Producer with the same id is already created: producerId={}, producer={}", new Object[]{this.remoteAddress, producerId, producer});
                    this.commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(), producer.getSchemaVersion());
                    return null;
                }
                ServerError error = null;
                if (!existingProducerFuture.isDone()) {
                    error = ServerError.ServiceNotReady;
                } else {
                    error = this.getErrorCode(existingProducerFuture);
                    this.producers.remove(producerId, (Object)existingProducerFuture);
                }
                log.warn("[{}][{}] Producer with id is already present on the connection, producerId={}", new Object[]{this.remoteAddress, topicName, producerId});
                this.commandSender.sendErrorResponse(requestId, error, "Producer is already present on the connection");
                return null;
            }
            log.info("[{}][{}] Creating producer. producerId={}, schema is {}", new Object[]{this.remoteAddress, topicName, producerId, schema == null ? "absent" : "present"});
            ((CompletableFuture)this.service.getOrCreateTopic(topicName.toString()).thenCompose(topic -> {
                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(topic.checkBacklogQuotaExceeded(producerName, BacklogQuota.BacklogQuotaType.destination_storage), topic.checkBacklogQuotaExceeded(producerName, BacklogQuota.BacklogQuotaType.message_age));
                backlogQuotaCheckFuture.thenRun(() -> {
                    if ((topic.isEncryptionRequired() || this.encryptionRequireOnProducer) && !isEncrypted) {
                        String msg = String.format("Encryption is required in %s", topicName);
                        log.warn("[{}] {}", (Object)this.remoteAddress, (Object)msg);
                        if (producerFuture.completeExceptionally(new BrokerServiceException.ServerMetadataException(msg))) {
                            this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
                        }
                        this.producers.remove(producerId, (Object)producerFuture);
                        return;
                    }
                    this.disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
                    CompletableFuture<SchemaVersion> schemaVersionFuture = this.tryAddSchema((Topic)topic, schema);
                    schemaVersionFuture.exceptionally(exception -> {
                        if (producerFuture.completeExceptionally((Throwable)exception)) {
                            String message = exception.getMessage();
                            if (exception.getCause() != null) {
                                message = message + " caused by " + exception.getCause();
                            }
                            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), message);
                        }
                        log.error("Try add schema failed, remote address {}, topic {}, producerId {}", new Object[]{this.remoteAddress, topicName, producerId, exception});
                        this.producers.remove(producerId, (Object)producerFuture);
                        return null;
                    });
                    schemaVersionFuture.thenAccept(schemaVersion -> ((CompletableFuture)topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> {
                        CompletableFuture<Object> createInitSubFuture;
                        if (!Strings.isNullOrEmpty((String)initialSubscriptionName) && topic.isPersistent() && !topic.getSubscriptions().containsKey((Object)initialSubscriptionName)) {
                            if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
                                String msg = "Could not create the initial subscription due to the auto subscription creation is not allowed.";
                                if (producerFuture.completeExceptionally(new BrokerServiceException.NotAllowedException(msg))) {
                                    log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", new Object[]{this.remoteAddress, msg, initialSubscriptionName, topicName});
                                    this.commandSender.sendErrorResponse(requestId, ServerError.NotAllowedError, msg);
                                }
                                this.producers.remove(producerId, (Object)producerFuture);
                                return;
                            }
                            createInitSubFuture = topic.createSubscription(initialSubscriptionName, CommandSubscribe.InitialPosition.Earliest, false);
                        } else {
                            createInitSubFuture = CompletableFuture.completedFuture(null);
                        }
                        createInitSubFuture.whenComplete((sub, ex) -> {
                            if (ex != null) {
                                String msg = "Failed to create the initial subscription: " + ex.getCause().getMessage();
                                log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", new Object[]{this.remoteAddress, msg, initialSubscriptionName, topicName});
                                if (producerFuture.completeExceptionally((Throwable)ex)) {
                                    this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(ex), msg);
                                }
                                this.producers.remove(producerId, (Object)producerFuture);
                                return;
                            }
                            this.buildProducerAndAddTopic((Topic)topic, producerId, producerName, requestId, isEncrypted, metadata, (SchemaVersion)schemaVersion, epoch, userProvidedProducerName, topicName, producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture);
                        });
                    })).exceptionally(exception -> {
                        Throwable cause = exception.getCause();
                        log.error("producerId {}, requestId {} : TransactionBuffer recover failed", new Object[]{producerId, requestId, exception});
                        if (producerFuture.completeExceptionally((Throwable)exception)) {
                            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.ServiceUnitNotReadyException.getClientErrorCode(cause), cause.getMessage());
                        }
                        this.producers.remove(producerId, (Object)producerFuture);
                        return null;
                    }));
                });
                return backlogQuotaCheckFuture;
            })).exceptionally(exception -> {
                Throwable cause = exception.getCause();
                if (cause instanceof BrokerServiceException.TopicBacklogQuotaExceededException) {
                    BrokerServiceException.TopicBacklogQuotaExceededException tbqe = (BrokerServiceException.TopicBacklogQuotaExceededException)cause;
                    IllegalStateException illegalStateException = new IllegalStateException(tbqe);
                    BacklogQuota.RetentionPolicy retentionPolicy = tbqe.getRetentionPolicy();
                    if (producerFuture.completeExceptionally(illegalStateException)) {
                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
                            this.commandSender.sendErrorResponse(requestId, ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage());
                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
                            this.commandSender.sendErrorResponse(requestId, ServerError.ProducerBlockedQuotaExceededException, illegalStateException.getMessage());
                        }
                    }
                    this.producers.remove(producerId, (Object)producerFuture);
                    return null;
                }
                if (cause instanceof NoSuchElementException) {
                    cause = new BrokerServiceException.TopicNotFoundException("Topic Not Found.");
                    log.info("[{}] Failed to load topic {}, producerId={}: Topic not found", new Object[]{this.remoteAddress, topicName, producerId});
                } else if (!Exceptions.areExceptionsPresentInChain((Throwable)cause, (Class[])new Class[]{BrokerServiceException.ServiceUnitNotReadyException.class, ManagedLedgerException.class})) {
                    log.error("[{}] Failed to create topic {}, producerId={}", new Object[]{this.remoteAddress, topicName, producerId, exception});
                }
                if (producerFuture.completeExceptionally((Throwable)exception)) {
                    this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
                }
                this.producers.remove(producerId, (Object)producerFuture);
                return null;
            });
            return null;
        })).exceptionally(ex -> {
            ServerCnx.logAuthException(this.remoteAddress, "producer", this.getPrincipal(), Optional.of(topicName), ex);
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
            return null;
        });
    }

    private void buildProducerAndAddTopic(Topic topic, long producerId, String producerName, long requestId, boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch, boolean userProvidedProducerName, TopicName topicName, ProducerAccessMode producerAccessMode, Optional<Long> topicEpoch, boolean supportsPartialProducer, CompletableFuture<Producer> producerFuture) {
        CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<Void>();
        Producer producer = new Producer(topic, this, producerId, producerName, this.getPrincipal(), isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName, producerAccessMode, topicEpoch, supportsPartialProducer);
        ((CompletableFuture)topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> {
            if (this.isActive()) {
                if (producerFuture.complete(producer)) {
                    log.info("[{}] Created new producer: {}", (Object)this.remoteAddress, (Object)producer);
                    this.commandSender.sendProducerSuccessResponse(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion(), (Optional<Long>)newTopicEpoch, true);
                    if (this.getBrokerService().getInterceptor() != null) {
                        this.getBrokerService().getInterceptor().producerCreated(this, producer, metadata);
                    }
                    return;
                }
                producer.closeNow(true);
                log.info("[{}] Cleared producer created after timeout on client side {}", (Object)this.remoteAddress, (Object)producer);
            } else {
                producer.closeNow(true);
                log.info("[{}] Cleared producer created after connection was closed: {}", (Object)this.remoteAddress, (Object)producer);
                producerFuture.completeExceptionally(new IllegalStateException("Producer created after connection was closed"));
            }
            this.producers.remove(producerId, (Object)producerFuture);
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof BrokerServiceException.ProducerFencedException) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Failed to add producer to topic {}: producerId={}, {}", new Object[]{this.remoteAddress, topicName, producerId, ex.getCause().getMessage()});
                }
            } else {
                log.warn("[{}] Failed to add producer to topic {}: producerId={}, {}", new Object[]{this.remoteAddress, topicName, producerId, ex.getCause().getMessage()});
            }
            producer.closeNow(true);
            if (producerFuture.completeExceptionally((Throwable)ex)) {
                this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
            }
            return null;
        });
        producerQueuedFuture.thenRun(() -> {
            if (this.isActive()) {
                log.info("[{}] Producer is waiting in queue: {}", (Object)this.remoteAddress, (Object)producer);
                this.commandSender.sendProducerSuccessResponse(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion(), Optional.empty(), false);
                if (this.getBrokerService().getInterceptor() != null) {
                    this.getBrokerService().getInterceptor().producerCreated(this, producer, metadata);
                }
            }
        });
    }

    protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture producerFuture = (CompletableFuture)this.producers.get(send.getProducerId());
        if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
            log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.", (Object)this.remoteAddress, (Object)send.getProducerId());
            this.close();
            return;
        }
        Producer producer = producerFuture.getNow(null);
        if (log.isDebugEnabled()) {
            this.printSendCommandDebug(send, headersAndPayload);
        }
        if (producer.isNonPersistentTopic()) {
            if (this.nonPersistentPendingMessages > this.maxNonPersistentPendingMessages) {
                long producerId = send.getProducerId();
                long sequenceId = send.getSequenceId();
                long highestSequenceId = send.getHighestSequenceId();
                this.service.getTopicOrderedExecutor().executeOrdered((Object)producer.getTopic().getName(), (SafeRunnable)SafeRun.safeRun(() -> this.commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1L, -1L)));
                producer.recordMessageDrop(send.getNumMessages());
                return;
            }
            ++this.nonPersistentPendingMessages;
        }
        this.startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
        if (send.hasTxnidMostBits() && send.hasTxnidLeastBits()) {
            TxnID txnID = new TxnID(send.getTxnidMostBits(), send.getTxnidLeastBits());
            producer.publishTxnMessage(txnID, producer.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
            return;
        }
        if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
            producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
        } else {
            producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
        }
    }

    private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) {
        headersAndPayload.markReaderIndex();
        MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
        headersAndPayload.resetReaderIndex();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}, partition key is: {}, ordering key is {}", new Object[]{this.remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(), headersAndPayload.readableBytes(), msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null, msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null});
        }
    }

    protected void handleAck(CommandAck ack) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(ack.getConsumerId());
        boolean hasRequestId = ack.hasRequestId();
        long requestId = hasRequestId ? ack.getRequestId() : 0L;
        long consumerId = ack.getConsumerId();
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            ((CompletableFuture)consumer.messageAcked(ack).thenRun(() -> {
                if (hasRequestId) {
                    this.ctx.writeAndFlush((Object)Commands.newAckResponse((long)requestId, null, null, (long)consumerId));
                }
                if (this.getBrokerService().getInterceptor() != null) {
                    this.getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
                }
            })).exceptionally(e -> {
                if (hasRequestId) {
                    this.ctx.writeAndFlush((Object)Commands.newAckResponse((long)requestId, (ServerError)BrokerServiceException.getClientErrorCode(e), (String)e.getMessage(), (long)consumerId));
                }
                return null;
            });
        }
    }

    protected void handleFlow(CommandFlow flow) {
        CompletableFuture consumerFuture;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received flow from consumer {} permits: {}", new Object[]{this.remoteAddress, flow.getConsumerId(), flow.getMessagePermits()});
        }
        if ((consumerFuture = (CompletableFuture)this.consumers.get(flow.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (consumer != null) {
                consumer.flowPermits(flow.getMessagePermits());
            } else {
                log.info("[{}] Couldn't find consumer {}", (Object)this.remoteAddress, (Object)flow.getConsumerId());
            }
        }
    }

    protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver) {
        CompletableFuture consumerFuture;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("[{}] redeliverUnacknowledged from consumer {}, consumerEpoch {}", new Object[]{this.remoteAddress, redeliver.getConsumerId(), redeliver.hasConsumerEpoch() ? Long.valueOf(redeliver.getConsumerEpoch()) : null});
        }
        if ((consumerFuture = (CompletableFuture)this.consumers.get(redeliver.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
            } else if (redeliver.hasConsumerEpoch()) {
                consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch());
            } else {
                consumer.redeliverUnacknowledgedMessages(-1L);
            }
        }
    }

    protected void handleUnsubscribe(CommandUnsubscribe unsubscribe) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(unsubscribe.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((Consumer)consumerFuture.getNow(null)).doUnsubscribe(unsubscribe.getRequestId());
        } else {
            this.commandSender.sendErrorResponse(unsubscribe.getRequestId(), ServerError.MetadataError, "Consumer not found");
        }
    }

    protected void handleSeek(CommandSeek seek) {
        boolean consumerCreated;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long requestId = seek.getRequestId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(seek.getConsumerId());
        if (!seek.hasMessageId() && !seek.hasMessagePublishTime()) {
            this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Message id and message publish time were not present");
            return;
        }
        boolean bl = consumerCreated = consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally();
        if (consumerCreated && seek.hasMessageId()) {
            Consumer consumer = consumerFuture.getNow(null);
            Subscription subscription = consumer.getSubscription();
            MessageIdData msgIdData = seek.getMessageId();
            long[] ackSet = null;
            if (msgIdData.getAckSetsCount() > 0) {
                ackSet = new long[msgIdData.getAckSetsCount()];
                for (int i = 0; i < ackSet.length; ++i) {
                    ackSet[i] = msgIdData.getAckSetAt(i);
                }
            }
            PositionImpl position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId(), ackSet);
            ((CompletableFuture)subscription.resetCursor((Position)position).thenRun(() -> this.lambda$handleSeek$34(subscription, (Position)position, requestId))).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, ex.getMessage(), ex});
                this.commandSender.sendErrorResponse(requestId, ServerError.UnknownError, "Error when resetting subscription: " + ex.getCause().getMessage());
                return null;
            });
        } else if (consumerCreated && seek.hasMessagePublishTime()) {
            Consumer consumer = consumerFuture.getNow(null);
            Subscription subscription = consumer.getSubscription();
            long timestamp = seek.getMessagePublishTime();
            ((CompletableFuture)subscription.resetCursor(timestamp).thenRun(() -> {
                log.info("[{}] [{}][{}] Reset subscription to publish time {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), timestamp});
                this.commandSender.sendSuccessResponse(requestId);
            })).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, ex.getMessage(), ex});
                this.commandSender.sendErrorResponse(requestId, ServerError.UnknownError, "Reset subscription to publish time error: " + ex.getCause().getMessage());
                return null;
            });
        } else {
            this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        ServerCnx other = (ServerCnx)o;
        return Objects.equals(this.ctx().channel().id(), other.ctx().channel().id());
    }

    public int hashCode() {
        return Objects.hash(this.ctx().channel().id());
    }

    protected void handleCloseProducer(CommandCloseProducer closeProducer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long producerId = closeProducer.getProducerId();
        long requestId = closeProducer.getRequestId();
        CompletableFuture producerFuture = (CompletableFuture)this.producers.get(producerId);
        if (producerFuture == null) {
            log.info("[{}] Producer {} was not registered on the connection", (Object)this.remoteAddress, (Object)producerId);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        if (!producerFuture.isDone() && producerFuture.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
            log.info("[{}] Closed producer before its creation was completed. producerId={}", (Object)this.remoteAddress, (Object)producerId);
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, (Object)producerFuture);
            return;
        }
        if (producerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed producer that already failed to be created. producerId={}", (Object)this.remoteAddress, (Object)producerId);
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, (Object)producerFuture);
            return;
        }
        Producer producer = producerFuture.getNow(null);
        log.info("[{}][{}] Closing producer on cnx {}. producerId={}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress, producerId});
        producer.close(true).thenAccept(v -> {
            log.info("[{}][{}] Closed producer on cnx {}. producerId={}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress, producerId});
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, (Object)producerFuture);
        });
    }

    protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        log.info("[{}] Closing consumer: consumerId={}", (Object)this.remoteAddress, (Object)closeConsumer.getConsumerId());
        long requestId = closeConsumer.getRequestId();
        long consumerId = closeConsumer.getConsumerId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(consumerId);
        if (consumerFuture == null) {
            log.info("[{}] Consumer was not registered on the connection: {}", (Object)consumerId, (Object)this.remoteAddress);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        if (!consumerFuture.isDone() && consumerFuture.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
            log.info("[{}] Closed consumer before its creation was completed. consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
            this.commandSender.sendSuccessResponse(requestId);
            return;
        }
        if (consumerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed consumer that already failed to be created. consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
            this.commandSender.sendSuccessResponse(requestId);
            return;
        }
        Consumer consumer = consumerFuture.getNow(null);
        try {
            consumer.close();
            this.consumers.remove(consumerId, (Object)consumerFuture);
            this.commandSender.sendSuccessResponse(requestId);
            log.info("[{}] Closed consumer, consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
        }
        catch (BrokerServiceException e) {
            log.warn("[{]] Error closing consumer {} : {}", new Object[]{this.remoteAddress, consumer, e});
            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
        }
    }

    protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(getLastMessageId.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            long requestId = getLastMessageId.getRequestId();
            Topic topic = consumer.getSubscription().getTopic();
            Position lastPosition = topic.getLastPosition();
            int partitionIndex = TopicName.getPartitionIndex((String)topic.getName());
            Position markDeletePosition = null;
            if (consumer.getSubscription() instanceof PersistentSubscription) {
                markDeletePosition = ((PersistentSubscription)consumer.getSubscription()).getCursor().getMarkDeletedPosition();
            }
            this.getLargestBatchIndexWhenPossible(topic, (PositionImpl)lastPosition, (PositionImpl)markDeletePosition, partitionIndex, requestId, consumer.getSubscription().getName());
        } else {
            this.ctx.writeAndFlush((Object)Commands.newError((long)getLastMessageId.getRequestId(), (ServerError)ServerError.MetadataError, (String)"Consumer not found"));
        }
    }

    private void getLargestBatchIndexWhenPossible(Topic topic, PositionImpl lastPosition, PositionImpl markDeletePosition, int partitionIndex, long requestId, String subscriptionName) {
        PersistentTopic persistentTopic = (PersistentTopic)topic;
        ManagedLedgerImpl ml = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
        Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
        if (lastPosition.getEntryId() == -1L || compactionHorizon.isPresent() && lastPosition.compareTo((PositionImpl)compactionHorizon.get()) <= 0) {
            this.handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex, markDeletePosition);
            return;
        }
        final CompletableFuture entryFuture = new CompletableFuture();
        ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryComplete(Entry entry, Object ctx) {
                entryFuture.complete(entry);
            }

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                entryFuture.completeExceptionally(exception);
            }
        }, null);
        CompletionStage batchSizeFuture = entryFuture.thenApply(entry -> {
            MessageMetadata metadata = Commands.parseMessageMetadata((ByteBuf)entry.getDataBuffer());
            int batchSize = metadata.getNumMessagesInBatch();
            entry.release();
            return metadata.hasNumMessagesInBatch() ? batchSize : -1;
        });
        ((CompletableFuture)batchSizeFuture).whenComplete((batchSize, e) -> {
            if (e != null) {
                if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
                    this.handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex, markDeletePosition);
                } else {
                    this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (ServerError)ServerError.MetadataError, (String)("Failed to get batch size for entry " + e.getMessage())));
                }
            } else {
                int largestBatchIndex;
                int n = largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", new Object[]{this.remoteAddress, topic.getName(), subscriptionName, lastPosition, partitionIndex});
                }
                this.ctx.writeAndFlush((Object)Commands.newGetLastMessageIdResponse((long)requestId, (long)lastPosition.getLedgerId(), (long)lastPosition.getEntryId(), (int)partitionIndex, (int)largestBatchIndex, (long)(markDeletePosition != null ? markDeletePosition.getLedgerId() : -1L), (long)(markDeletePosition != null ? markDeletePosition.getEntryId() : -1L)));
            }
        });
    }

    private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTopic, long requestId, int partitionIndex, PositionImpl markDeletePosition) {
        ((CompletableFuture)persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> {
            if (entry != null) {
                MessageMetadata metadata = Commands.parseMessageMetadata((ByteBuf)entry.getDataBuffer());
                int bs = metadata.getNumMessagesInBatch();
                int largestBatchIndex = bs > 0 ? bs - 1 : -1;
                this.ctx.writeAndFlush((Object)Commands.newGetLastMessageIdResponse((long)requestId, (long)entry.getLedgerId(), (long)entry.getEntryId(), (int)partitionIndex, (int)largestBatchIndex, (long)(markDeletePosition != null ? markDeletePosition.getLedgerId() : -1L), (long)(markDeletePosition != null ? markDeletePosition.getEntryId() : -1L)));
                entry.release();
            } else {
                this.ctx.writeAndFlush((Object)Commands.newGetLastMessageIdResponse((long)requestId, (long)-1L, (long)-1L, (int)partitionIndex, (int)-1, (long)(markDeletePosition != null ? markDeletePosition.getLedgerId() : -1L), (long)(markDeletePosition != null ? markDeletePosition.getEntryId() : -1L)));
            }
        })).exceptionally(ex -> {
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (ServerError)ServerError.MetadataError, (String)("Failed to read last entry of the compacted Ledger " + ex.getCause().getMessage())));
            return null;
        });
    }

    private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName, NamespaceOperation operation) {
        if (!this.service.isAuthorizationEnabled()) {
            return CompletableFuture.completedFuture(true);
        }
        CompletableFuture isProxyAuthorizedFuture = this.originalPrincipal != null ? this.service.getAuthorizationService().allowNamespaceOperationAsync(namespaceName, operation, this.originalPrincipal, this.getAuthenticationData()) : CompletableFuture.completedFuture(true);
        CompletableFuture isAuthorizedFuture = this.service.getAuthorizationService().allowNamespaceOperationAsync(namespaceName, operation, this.authRole, this.authenticationData);
        return isProxyAuthorizedFuture.thenCombine((CompletionStage)isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
            if (!isProxyAuthorized.booleanValue()) {
                log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}", new Object[]{this.originalPrincipal, operation, namespaceName});
            }
            if (!isAuthorized.booleanValue()) {
                log.warn("Role {} is not authorized to perform operation {} on namespace {}", new Object[]{this.authRole, operation, namespaceName});
            }
            return isProxyAuthorized != false && isAuthorized != false;
        });
    }

    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        long requestId = commandGetTopicsOfNamespace.getRequestId();
        String namespace = commandGetTopicsOfNamespace.getNamespace();
        CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
        Optional<String> topicsPattern = Optional.ofNullable(commandGetTopicsOfNamespace.hasTopicsPattern() ? commandGetTopicsOfNamespace.getTopicsPattern() : null);
        Optional<String> topicsHash = Optional.ofNullable(commandGetTopicsOfNamespace.hasTopicsHash() ? commandGetTopicsOfNamespace.getTopicsHash() : null);
        NamespaceName namespaceName = NamespaceName.get((String)namespace);
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ", this.authRole, this.originalPrincipal, namespaceName});
                this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ");
                lookupSemaphore.release();
                return;
            }
            ((CompletableFuture)this.isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
                if (isAuthorized.booleanValue()) {
                    ((CompletableFuture)this.getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode).thenAccept(topics -> {
                        boolean hashUnchanged;
                        boolean filterTopics = false;
                        List filteredTopics = topics;
                        if (this.enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) {
                            if (((String)topicsPattern.get()).length() <= this.maxSubscriptionPatternLength) {
                                filterTopics = true;
                                filteredTopics = TopicList.filterTopics(topics, (String)((String)topicsPattern.get()));
                            } else {
                                log.info("[{}] Subscription pattern provided was longer than maximum {}.", (Object)this.remoteAddress, (Object)this.maxSubscriptionPatternLength);
                            }
                        }
                        String hash = TopicList.calculateHash((List)filteredTopics);
                        boolean bl = hashUnchanged = topicsHash.isPresent() && ((String)topicsHash.get()).equals(hash);
                        if (hashUnchanged) {
                            filteredTopics = Collections.emptyList();
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", new Object[]{this.remoteAddress, namespace, requestId, topics.size()});
                        }
                        this.commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, hash, filterTopics, !hashUnchanged, requestId);
                        lookupSemaphore.release();
                    })).exceptionally(ex -> {
                        log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", new Object[]{this.remoteAddress, namespace, requestId});
                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(new BrokerServiceException.ServerMetadataException((Throwable)ex)), ex.getMessage());
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to GetTopicsOfNamespace";
                    log.warn("[{}] {} with role {} on namespace {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to GetTopicsOfNamespace", this.getPrincipal(), namespaceName});
                    this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Proxy Client is not authorized to GetTopicsOfNamespace");
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                ServerCnx.logNamespaceNameAuthException(this.remoteAddress, "GetTopicsOfNamespace", this.getPrincipal(), Optional.of(namespaceName), ex);
                String msg = "Exception occurred while trying to authorize GetTopicsOfNamespace";
                this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Exception occurred while trying to authorize GetTopicsOfNamespace");
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed GetTopicsOfNamespace lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)namespaceName);
            }
            this.commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests, "Failed due to too many pending lookup requests");
        }
    }

    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
        String schemaName;
        if (log.isDebugEnabled()) {
            if (commandGetSchema.hasSchemaVersion()) {
                log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}", new Object[]{this.remoteAddress, new String(commandGetSchema.getSchemaVersion()), commandGetSchema.getTopic(), commandGetSchema.getRequestId()});
            } else {
                log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}", new Object[]{this.remoteAddress, null, commandGetSchema.getTopic(), commandGetSchema.getRequestId()});
            }
        }
        long requestId = commandGetSchema.getRequestId();
        SchemaVersion schemaVersion = SchemaVersion.Latest;
        if (commandGetSchema.hasSchemaVersion()) {
            if (commandGetSchema.getSchemaVersion().length == 0) {
                this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.IncompatibleSchema, "Empty schema version");
                return;
            }
            schemaVersion = this.schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
        }
        try {
            schemaName = TopicName.get((String)commandGetSchema.getTopic()).getSchemaName();
        }
        catch (Throwable t) {
            this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage());
            return;
        }
        ((CompletableFuture)this.schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
            if (schemaAndMetadata == null) {
                this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, "Topic not found or no-schema");
            } else {
                this.commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo((String)schemaName, (SchemaData)schemaAndMetadata.schema), schemaAndMetadata.version);
            }
        })).exceptionally(ex -> {
            this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.UnknownError, ex.getMessage());
            return null;
        });
    }

    protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCreateSchema) {
        if (log.isDebugEnabled()) {
            log.debug("Received CommandGetOrCreateSchema call from {}", (Object)this.remoteAddress);
        }
        long requestId = commandGetOrCreateSchema.getRequestId();
        String topicName = commandGetOrCreateSchema.getTopic();
        SchemaData schemaData = this.getSchema(commandGetOrCreateSchema.getSchema());
        SchemaData schema = schemaData.getType() == SchemaType.NONE ? null : schemaData;
        ((CompletableFuture)this.service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
            if (topicOpt.isPresent()) {
                Topic topic = (Topic)topicOpt.get();
                CompletableFuture<SchemaVersion> schemaVersionFuture = this.tryAddSchema(topic, schema);
                ((CompletableFuture)schemaVersionFuture.exceptionally(ex -> {
                    ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
                    String message = ex.getMessage();
                    if (ex.getCause() != null) {
                        message = message + " caused by " + ex.getCause();
                    }
                    this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, errorCode, message);
                    return null;
                })).thenAccept(schemaVersion -> this.commandSender.sendGetOrCreateSchemaResponse(requestId, (SchemaVersion)schemaVersion));
            } else {
                this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, ServerError.TopicNotFound, "Topic not found");
            }
        })).exceptionally(ex -> {
            ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
            this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, errorCode, ex.getMessage());
            return null;
        });
    }

    protected void handleTcClientConnectRequest(CommandTcClientConnectRequest command) {
        long requestId = command.getRequestId();
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get((long)command.getTcId());
        if (log.isDebugEnabled()) {
            log.debug("Receive tc client connect request {} to transaction meta store {} from {}.", new Object[]{requestId, tcId, this.remoteAddress});
        }
        if (!this.checkTransactionEnableAndSendError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        ((CompletableFuture)transactionMetadataStoreService.handleTcClientConnect(tcId).thenAccept(connection -> {
            if (log.isDebugEnabled()) {
                log.debug("Handle tc client connect request {} to transaction meta store {} from {} success.", new Object[]{requestId, tcId, this.remoteAddress});
            }
            this.commandSender.sendTcClientConnectResponse(requestId);
        })).exceptionally(e -> {
            log.error("Handle tc client connect request {} to transaction meta store {} from {} fail.", new Object[]{requestId, tcId, this.remoteAddress, e.getCause()});
            this.commandSender.sendTcClientConnectResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
            return null;
        });
    }

    private boolean checkTransactionEnableAndSendError(long requestId) {
        if (!this.service.getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
            BrokerServiceException.NotAllowedException ex = new BrokerServiceException.NotAllowedException("Transaction manager is not not enabled");
            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
            return false;
        }
        return true;
    }

    private Throwable handleTxnException(Throwable ex, String op, long requestId) {
        Throwable cause = FutureUtil.unwrapCompletionException((Throwable)ex);
        if (cause instanceof CoordinatorException.CoordinatorNotFoundException) {
            if (log.isDebugEnabled()) {
                log.debug("The Coordinator was not found for the request {}", (Object)op);
            }
            return cause;
        }
        if (cause instanceof ManagedLedgerException.ManagedLedgerFencedException) {
            if (log.isDebugEnabled()) {
                log.debug("Throw a CoordinatorNotFoundException to client with the message got from a ManagedLedgerFencedException for the request {}", (Object)op);
            }
            return new CoordinatorException.CoordinatorNotFoundException(cause.getMessage());
        }
        log.error("Send response error for {} request {}.", new Object[]{op, requestId, cause});
        return cause;
    }

    protected void handleNewTxn(CommandNewTxn command) {
        long requestId = command.getRequestId();
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get((long)command.getTcId());
        if (log.isDebugEnabled()) {
            log.debug("Receive new txn request {} to transaction meta store {} from {}.", new Object[]{requestId, tcId, this.remoteAddress});
        }
        if (!this.checkTransactionEnableAndSendError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds()).whenComplete((txnID, ex) -> {
            if (ex == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Send response {} for new txn request {}", (Object)tcId.getId(), (Object)requestId);
                }
                this.commandSender.sendNewTxnResponse(requestId, (TxnID)txnID, command.getTcId());
            } else {
                ex = this.handleTxnException((Throwable)ex, BaseCommand.Type.NEW_TXN.name(), requestId);
                this.commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
                transactionMetadataStoreService.handleOpFail((Throwable)ex, tcId);
            }
        });
    }

    protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get((long)command.getTxnidMostBits());
        long requestId = command.getRequestId();
        if (log.isDebugEnabled()) {
            command.getPartitionsList().forEach(partion -> log.debug("Receive add published partition to txn request {} from {} with txnId {}, topic: [{}]", new Object[]{requestId, this.remoteAddress, txnID, partion}));
        }
        if (!this.checkTransactionEnableAndSendError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        this.service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID, command.getPartitionsList()).whenComplete((v, ex) -> {
            if (ex == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Send response success for add published partition to txn request {}", (Object)requestId);
                }
                this.ctx.writeAndFlush((Object)Commands.newAddPartitionToTxnResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
            } else {
                ex = this.handleTxnException((Throwable)ex, BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
                this.ctx.writeAndFlush((Object)Commands.newAddPartitionToTxnResponse((long)requestId, (long)txnID.getMostSigBits(), (ServerError)BrokerServiceException.getClientErrorCode(ex), (String)ex.getMessage()));
                transactionMetadataStoreService.handleOpFail((Throwable)ex, tcId);
            }
        });
    }

    protected void handleEndTxn(CommandEndTxn command) {
        long requestId = command.getRequestId();
        int txnAction = command.getTxnAction().getValue();
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get((long)command.getTxnidMostBits());
        if (!this.checkTransactionEnableAndSendError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        transactionMetadataStoreService.endTransaction(txnID, txnAction, false).whenComplete((v, ex) -> {
            if (ex == null) {
                this.commandSender.sendEndTxnResponse(requestId, txnID, txnAction);
            } else {
                ex = this.handleTxnException((Throwable)ex, BaseCommand.Type.END_TXN.name(), requestId);
                this.commandSender.sendEndTxnErrorResponse(requestId, txnID, BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
                transactionMetadataStoreService.handleOpFail((Throwable)ex, tcId);
            }
        });
    }

    protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
        long requestId = command.getRequestId();
        String topic = command.getTopic();
        int txnAction = command.getTxnAction().getValue();
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
        if (log.isDebugEnabled()) {
            log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, txnAction});
        }
        CompletableFuture<Optional<Topic>> topicFuture = this.service.getTopicIfExists(TopicName.get((String)topic).toString());
        ((CompletableFuture)topicFuture.thenAccept(optionalTopic -> {
            if (optionalTopic.isPresent()) {
                ((Topic)optionalTopic.get()).endTxn(txnID, txnAction, lowWaterMark).whenComplete((ignored, throwable) -> {
                    if (throwable != null) {
                        log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf((int)txnAction), throwable});
                        this.ctx.writeAndFlush((Object)Commands.newEndTxnOnPartitionResponse((long)requestId, (ServerError)BrokerServiceException.getClientErrorCode(throwable), (String)throwable.getMessage(), (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
                        return;
                    }
                    this.ctx.writeAndFlush((Object)Commands.newEndTxnOnPartitionResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
                });
            } else {
                ((CompletableFuture)this.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get((String)topic).getPersistenceNamingEncoding()).thenAccept(b -> {
                    if (b.booleanValue()) {
                        log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf((int)txnAction)});
                        this.ctx.writeAndFlush((Object)Commands.newEndTxnOnPartitionResponse((long)requestId, (ServerError)ServerError.ServiceNotReady, (String)("The topic " + topic + " does not exist in broker."), (long)txnID.getMostSigBits(), (long)txnID.getLeastSigBits()));
                    } else {
                        log.warn("handleEndTxnOnPartition fail ! The topic {} has not been created, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf((int)txnAction)});
                        this.ctx.writeAndFlush((Object)Commands.newEndTxnOnPartitionResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
                    }
                })).exceptionally(e -> {
                    log.error("handleEndTxnOnPartition fail ! topic {}, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf((int)txnAction), e.getCause()});
                    this.ctx.writeAndFlush((Object)Commands.newEndTxnOnPartitionResponse((long)requestId, (ServerError)ServerError.ServiceNotReady, (String)e.getMessage(), (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
                    return null;
                });
            }
        })).exceptionally(e -> {
            log.error("handleEndTxnOnPartition fail ! topic {}, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf((int)txnAction), e.getCause()});
            this.ctx.writeAndFlush((Object)Commands.newEndTxnOnPartitionResponse((long)requestId, (ServerError)ServerError.ServiceNotReady, (String)e.getMessage(), (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
            return null;
        });
    }

    protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
        long requestId = command.getRequestId();
        long txnidMostBits = command.getTxnidMostBits();
        long txnidLeastBits = command.getTxnidLeastBits();
        String topic = command.getSubscription().getTopic();
        String subName = command.getSubscription().getSubscription();
        int txnAction = command.getTxnAction().getValue();
        TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
        long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, new TxnID(txnidMostBits, txnidLeastBits), txnAction});
        }
        CompletableFuture<Optional<Topic>> topicFuture = this.service.getTopicIfExists(TopicName.get((String)topic).toString());
        ((CompletableFuture)topicFuture.thenAccept(optionalTopic -> {
            if (optionalTopic.isPresent()) {
                Subscription subscription = ((Topic)optionalTopic.get()).getSubscription(subName);
                if (subscription == null) {
                    log.warn("handleEndTxnOnSubscription fail! topic {} subscription {} does not exist. txnId: [{}], txnAction: [{}]", new Object[]{((Topic)optionalTopic.get()).getName(), subName, txnID, TxnAction.valueOf((int)txnAction)});
                    this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnidLeastBits, (long)txnidMostBits));
                    return;
                }
                CompletableFuture<Void> completableFuture = subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
                completableFuture.whenComplete((ignored, e) -> {
                    if (e != null) {
                        log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, txnID, TxnAction.valueOf((int)txnAction), e.getCause()});
                        this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnidLeastBits, (long)txnidMostBits, (ServerError)BrokerServiceException.getClientErrorCode(e), (String)"Handle end txn on subscription failed."));
                        return;
                    }
                    this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnidLeastBits, (long)txnidMostBits));
                });
            } else {
                ((CompletableFuture)this.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get((String)topic).getPersistenceNamingEncoding()).thenAccept(b -> {
                    if (b.booleanValue()) {
                        log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, subscription: {}, txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf((int)txnAction)});
                        this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits(), (ServerError)ServerError.ServiceNotReady, (String)("The topic " + topic + " does not exist in broker.")));
                    } else {
                        log.warn("handleEndTxnOnSubscription fail ! The topic {} has not been created, subscription: {} txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, txnID, TxnAction.valueOf((int)txnAction)});
                        this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
                    }
                })).exceptionally(e -> {
                    log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, txnID, TxnAction.valueOf((int)txnAction), e.getCause()});
                    this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits(), (ServerError)ServerError.ServiceNotReady, (String)e.getMessage()));
                    return null;
                });
            }
        })).exceptionally(e -> {
            log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, txnID, TxnAction.valueOf((int)txnAction), e.getCause()});
            this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnidLeastBits, (long)txnidMostBits, (ServerError)ServerError.ServiceNotReady, (String)"Handle end txn on subscription failed."));
            return null;
        });
    }

    private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
        if (schema != null) {
            return topic.addSchema(schema);
        }
        return topic.hasSchema().thenCompose(hasSchema -> {
            log.info("[{}] {} configured with schema {}", new Object[]{this.remoteAddress, topic.getName(), hasSchema});
            CompletableFuture<SchemaVersion> result = new CompletableFuture<SchemaVersion>();
            if (hasSchema.booleanValue() && (this.schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
                result.completeExceptionally(new IncompatibleSchemaException("Producers cannot connect or send message without a schema to topics with a schema"));
            } else {
                result.complete(SchemaVersion.Empty);
            }
            return result;
        });
    }

    protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        long requestId = command.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("Receive add published partition to txn request {} from {} with txnId {}", new Object[]{requestId, this.remoteAddress, txnID});
        }
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get((long)command.getTxnidMostBits());
        if (!this.checkTransactionEnableAndSendError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        transactionMetadataStoreService.addAckedPartitionToTxn(txnID, MLTransactionMetadataStore.subscriptionToTxnSubscription((List)command.getSubscriptionsList())).whenComplete((v, ex) -> {
            if (ex == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Send response success for add published partition to txn request {}", (Object)requestId);
                }
                this.ctx.writeAndFlush((Object)Commands.newAddSubscriptionToTxnResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
            } else {
                ex = this.handleTxnException((Throwable)ex, BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
                this.ctx.writeAndFlush((Object)Commands.newAddSubscriptionToTxnResponse((long)requestId, (long)txnID.getMostSigBits(), (ServerError)BrokerServiceException.getClientErrorCode(ex), (String)ex.getMessage()));
                transactionMetadataStoreService.handleOpFail((Throwable)ex, tcId);
            }
        });
    }

    protected boolean isHandshakeCompleted() {
        return this.state == State.Connected;
    }

    public ChannelHandlerContext ctx() {
        return this.ctx;
    }

    protected void interceptCommand(BaseCommand command) throws InterceptException {
        if (this.getBrokerService().getInterceptor() != null) {
            this.getBrokerService().getInterceptor().onPulsarCommand(command, this);
        }
    }

    @Override
    public void closeProducer(Producer producer) {
        this.safelyRemoveProducer(producer);
        if (this.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v5.getValue()) {
            this.ctx.writeAndFlush((Object)Commands.newCloseProducer((long)producer.getProducerId(), (long)-1L));
        } else {
            this.close();
        }
    }

    @Override
    public void closeConsumer(Consumer consumer) {
        this.safelyRemoveConsumer(consumer);
        if (this.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v5.getValue()) {
            this.ctx.writeAndFlush((Object)Commands.newCloseConsumer((long)consumer.consumerId(), (long)-1L));
        } else {
            this.close();
        }
    }

    protected void close() {
        this.ctx.close();
    }

    @Override
    public SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    @Override
    public void removedConsumer(Consumer consumer) {
        this.safelyRemoveConsumer(consumer);
    }

    @Override
    public void removedProducer(Producer producer) {
        this.safelyRemoveProducer(producer);
    }

    private void safelyRemoveProducer(Producer producer) {
        CompletableFuture future;
        long producerId = producer.getProducerId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: producerId={}, producer={}", new Object[]{this.remoteAddress, producerId, producer});
        }
        if ((future = (CompletableFuture)this.producers.get(producerId)) != null) {
            future.whenComplete((producer2, exception) -> {
                if (exception != null || producer2 == producer) {
                    this.producers.remove(producerId, (Object)future);
                }
            });
        }
    }

    private void safelyRemoveConsumer(Consumer consumer) {
        CompletableFuture future;
        long consumerId = consumer.consumerId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: consumerId={}, consumer={}", new Object[]{this.remoteAddress, consumerId, consumer});
        }
        if ((future = (CompletableFuture)this.consumers.get(consumerId)) != null) {
            future.whenComplete((consumer2, exception) -> {
                if (exception != null || consumer2 == consumer) {
                    this.consumers.remove(consumerId, (Object)future);
                }
            });
        }
    }

    @Override
    public boolean isActive() {
        return this.isActive;
    }

    @Override
    public boolean isWritable() {
        return this.ctx.channel().isWritable();
    }

    public void startSendOperation(Producer producer, int msgSize, int numMessages) {
        boolean isPublishRateExceeded = false;
        if (this.preciseTopicPublishRateLimitingEnable) {
            boolean isPreciseTopicPublishRateExceeded = producer.getTopic().isTopicPublishRateExceeded(numMessages, msgSize);
            if (isPreciseTopicPublishRateExceeded) {
                producer.getTopic().disableCnxAutoRead();
                return;
            }
            isPublishRateExceeded = producer.getTopic().isBrokerPublishRateExceeded();
        } else {
            boolean resourceGroupPublishRateExceeded;
            if (producer.getTopic().isResourceGroupRateLimitingEnabled() && (resourceGroupPublishRateExceeded = producer.getTopic().isResourceGroupPublishRateExceeded(numMessages, msgSize))) {
                producer.getTopic().disableCnxAutoRead();
                return;
            }
            isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
        }
        if (++this.pendingSendRequest == this.maxPendingSendRequests || isPublishRateExceeded) {
            this.ctx.channel().config().setAutoRead(false);
            this.recordRateLimitMetrics(this.producers);
            this.autoReadDisabledRateLimiting = isPublishRateExceeded;
            throttledConnections.inc();
        }
        if (((MutableLong)pendingBytesPerThread.get()).addAndGet((long)msgSize) >= this.maxPendingBytesPerThread && !this.autoReadDisabledPublishBufferLimiting && this.maxPendingBytesPerThread > 0L) {
            MutableInt pausedConnections = new MutableInt();
            ((Set)cnxsPerThread.get()).forEach(cnx -> {
                if (cnx.hasProducers() && !cnx.autoReadDisabledPublishBufferLimiting) {
                    cnx.disableCnxAutoRead();
                    cnx.autoReadDisabledPublishBufferLimiting = true;
                    pausedConnections.increment();
                }
            });
            this.getBrokerService().pausedConnections(pausedConnections.intValue());
        }
    }

    private void recordRateLimitMetrics(ConcurrentLongHashMap<CompletableFuture<Producer>> producers) {
        producers.forEach((key, producerFuture) -> {
            Producer p;
            if (producerFuture != null && producerFuture.isDone() && (p = (Producer)producerFuture.getNow(null)) != null && p.getTopic() != null) {
                p.getTopic().increasePublishLimitedTimes();
            }
        });
    }

    @Override
    public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
        if (((MutableLong)pendingBytesPerThread.get()).addAndGet((long)(-msgSize)) < this.resumeThresholdPendingBytesPerThread && this.autoReadDisabledPublishBufferLimiting) {
            MutableInt resumedConnections = new MutableInt();
            ((Set)cnxsPerThread.get()).forEach(cnx -> {
                if (cnx.autoReadDisabledPublishBufferLimiting) {
                    cnx.autoReadDisabledPublishBufferLimiting = false;
                    cnx.enableCnxAutoRead();
                    resumedConnections.increment();
                }
            });
            this.getBrokerService().resumedConnections(resumedConnections.intValue());
        }
        if (--this.pendingSendRequest == this.resumeReadsThreshold) {
            this.enableCnxAutoRead();
        }
        if (isNonPersistentTopic) {
            --this.nonPersistentPendingMessages;
        }
    }

    @Override
    public void enableCnxAutoRead() {
        if (!(this.ctx == null || this.ctx.channel().config().isAutoRead() || this.autoReadDisabledRateLimiting || this.autoReadDisabledPublishBufferLimiting)) {
            this.ctx.channel().config().setAutoRead(true);
            this.ctx.read();
            throttledConnections.dec();
        }
    }

    @Override
    public void disableCnxAutoRead() {
        if (this.ctx != null && this.ctx.channel().config().isAutoRead()) {
            this.ctx.channel().config().setAutoRead(false);
            this.recordRateLimitMetrics(this.producers);
        }
    }

    @Override
    public void cancelPublishRateLimiting() {
        if (this.autoReadDisabledRateLimiting) {
            this.autoReadDisabledRateLimiting = false;
        }
    }

    @Override
    public void cancelPublishBufferLimiting() {
        if (this.autoReadDisabledPublishBufferLimiting) {
            this.autoReadDisabledPublishBufferLimiting = false;
            throttledConnectionsGlobal.dec();
        }
    }

    private <T> ServerError getErrorCode(CompletableFuture<T> future) {
        ServerError error;
        block2: {
            error = ServerError.UnknownError;
            try {
                future.getNow(null);
            }
            catch (Exception e) {
                if (!(e.getCause() instanceof BrokerServiceException)) break block2;
                error = BrokerServiceException.getClientErrorCode(e.getCause());
            }
        }
        return error;
    }

    private void disableTcpNoDelayIfNeeded(String topic, String producerName) {
        if (producerName != null && producerName.startsWith(this.replicatorPrefix)) {
            try {
                if (((Boolean)this.ctx.channel().config().getOption(ChannelOption.TCP_NODELAY)).booleanValue()) {
                    this.ctx.channel().config().setOption(ChannelOption.TCP_NODELAY, (Object)false);
                }
            }
            catch (Throwable t) {
                log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", new Object[]{topic, producerName, this.ctx.channel()});
            }
        }
    }

    private TopicName validateTopicName(String topic, long requestId, Object requestCommand) {
        try {
            return TopicName.get((String)topic);
        }
        catch (Throwable t) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to parse topic name '{}'", new Object[]{this.remoteAddress, topic, t});
            }
            if (requestCommand instanceof CommandLookupTopic) {
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.InvalidTopicName, (String)("Invalid topic name: " + t.getMessage()), (long)requestId));
            } else if (requestCommand instanceof CommandPartitionedTopicMetadata) {
                this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((ServerError)ServerError.InvalidTopicName, (String)("Invalid topic name: " + t.getMessage()), (long)requestId));
            } else {
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (ServerError)ServerError.InvalidTopicName, (String)("Invalid topic name: " + t.getMessage())));
            }
            return null;
        }
    }

    public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long entryId, int partition, int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet, String topic, long epoch) {
        BaseCommand command = Commands.newMessageCommand((long)consumerId, (long)ledgerId, (long)entryId, (int)partition, (int)redeliveryCount, (long[])ackSet, (long)epoch);
        ByteBufPair res = Commands.serializeCommandMessageWithSize((BaseCommand)command, (ByteBuf)metadataAndPayload);
        try {
            BrokerInterceptor brokerInterceptor = this.getBrokerService().getInterceptor();
            if (brokerInterceptor != null) {
                brokerInterceptor.onPulsarCommand(command, this);
                CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(consumerId);
                if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
                    Consumer consumer = consumerFuture.getNow(null);
                    brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
                }
            } else {
                log.debug("BrokerInterceptor is not set in newMessageAndIntercept");
            }
        }
        catch (Exception e) {
            log.error("Exception occur when intercept messages.", (Throwable)e);
        }
        return res;
    }

    public State getState() {
        return this.state;
    }

    public SocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    @Override
    public BrokerService getBrokerService() {
        return this.service;
    }

    public String getRole() {
        return this.authRole;
    }

    @Override
    public Promise<Void> newPromise() {
        return this.ctx.newPromise();
    }

    @Override
    public HAProxyMessage getHAProxyMessage() {
        return this.proxyMessage;
    }

    @Override
    public boolean hasHAProxyMessage() {
        return this.proxyMessage != null;
    }

    boolean hasConsumer(long consumerId) {
        return this.consumers.containsKey(consumerId);
    }

    @Override
    public boolean isBatchMessageCompatibleVersion() {
        return this.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v4.getValue();
    }

    boolean supportsAuthenticationRefresh() {
        return this.features != null && this.features.isSupportsAuthRefresh();
    }

    boolean supportBrokerMetadata() {
        return this.features != null && this.features.isSupportsBrokerEntryMetadata();
    }

    boolean supportsPartialProducer() {
        return this.features != null && this.features.isSupportsPartialProducer();
    }

    @Override
    public String getClientVersion() {
        return this.clientVersion;
    }

    @VisibleForTesting
    void setAutoReadDisabledRateLimiting(boolean isLimiting) {
        this.autoReadDisabledRateLimiting = isLimiting;
    }

    @Override
    public boolean isPreciseDispatcherFlowControl() {
        return this.preciseDispatcherFlowControl;
    }

    public AuthenticationState getAuthState() {
        return this.authState;
    }

    @Override
    public AuthenticationDataSource getAuthenticationData() {
        return this.originalAuthData != null ? this.originalAuthData : this.authenticationData;
    }

    public String getPrincipal() {
        return this.originalPrincipal != null ? this.originalPrincipal : this.authRole;
    }

    public AuthenticationProvider getAuthenticationProvider() {
        return this.authenticationProvider;
    }

    @Override
    public String getAuthRole() {
        return this.authRole;
    }

    public String getAuthMethod() {
        return this.authMethod;
    }

    public ConcurrentLongHashMap<CompletableFuture<Consumer>> getConsumers() {
        return this.consumers;
    }

    public ConcurrentLongHashMap<CompletableFuture<Producer>> getProducers() {
        return this.producers;
    }

    @Override
    public PulsarCommandSender getCommandSender() {
        return this.commandSender;
    }

    @Override
    public void execute(Runnable runnable) {
        this.ctx().channel().eventLoop().execute(runnable);
    }

    @Override
    public String clientSourceAddress() {
        if (this.proxyMessage != null) {
            return this.proxyMessage.sourceAddress();
        }
        if (this.remoteAddress instanceof InetSocketAddress) {
            InetSocketAddress inetAddress = (InetSocketAddress)this.remoteAddress;
            return inetAddress.getAddress().getHostAddress();
        }
        return null;
    }

    private static void logAuthException(SocketAddress remoteAddress, String operation, String principal, Optional<TopicName> topic, Throwable ex) {
        String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
        if (ex instanceof AuthenticationException) {
            log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}", new Object[]{remoteAddress, operation, principal, topicString, ex.getMessage()});
        } else {
            log.error("[{}] Error trying to authenticate: operation={}, principal={}{}", new Object[]{remoteAddress, operation, principal, topicString, ex});
        }
    }

    private static void logNamespaceNameAuthException(SocketAddress remoteAddress, String operation, String principal, Optional<NamespaceName> namespaceName, Throwable ex) {
        String namespaceNameString = namespaceName.map(t -> ", namespace=" + t.toString()).orElse("");
        if (ex instanceof AuthenticationException) {
            log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}", new Object[]{remoteAddress, operation, principal, namespaceNameString, ex.getMessage()});
        } else {
            log.error("[{}] Error trying to authenticate: operation={}, principal={}{}", new Object[]{remoteAddress, operation, principal, namespaceNameString, ex});
        }
    }

    public boolean hasProducers() {
        return !this.producers.isEmpty();
    }

    private /* synthetic */ void lambda$handleSeek$34(Subscription subscription, Position position, long requestId) {
        log.info("[{}] [{}][{}] Reset subscription to message id {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), position});
        this.commandSender.sendSuccessResponse(requestId);
    }

    private /* synthetic */ Object lambda$handleSubscribe$15(TopicName topicName, String subscriptionName, Map metadata, long consumerId, CompletableFuture consumerFuture, long requestId, CompletableFuture existingConsumerFuture, boolean forceTopicCreation, boolean isDurable, CommandSubscribe subscribe, CommandSubscribe.SubType subType, int priorityLevel, String consumerName, MessageIdImpl startMessageId, boolean readCompacted, CommandSubscribe.InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean isReplicated, KeySharedMeta keySharedMeta, SchemaData schema, Boolean isAuthorized) {
        if (isAuthorized.booleanValue()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client is authorized to subscribe with role {}", (Object)this.remoteAddress, (Object)this.getPrincipal());
            }
            log.info("[{}] Subscribing on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
            try {
                Metadata.validateMetadata((Map)metadata);
            }
            catch (IllegalArgumentException iae) {
                String msg = iae.getMessage();
                this.consumers.remove(consumerId, (Object)consumerFuture);
                this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
                return null;
            }
            if (existingConsumerFuture != null) {
                if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                    Consumer consumer2 = existingConsumerFuture.getNow(null);
                    log.info("[{}] Consumer with the same id is already created: consumerId={}, consumer={}", new Object[]{this.remoteAddress, consumerId, consumer2});
                    this.commandSender.sendSuccessResponse(requestId);
                    return null;
                }
                log.warn("[{}][{}][{}] Consumer with id is already present on the connection, consumerId={}", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId});
                ServerError error = null;
                if (!existingConsumerFuture.isDone()) {
                    error = ServerError.ServiceNotReady;
                } else {
                    error = this.getErrorCode(existingConsumerFuture);
                    this.consumers.remove(consumerId, (Object)existingConsumerFuture);
                }
                this.commandSender.sendErrorResponse(requestId, error, "Consumer is already present on the connection");
                return null;
            }
            boolean createTopicIfDoesNotExist = forceTopicCreation && this.service.isAllowAutoTopicCreation(topicName.toString());
            ((CompletableFuture)((CompletableFuture)this.service.getTopic(topicName.toString(), createTopicIfDoesNotExist).thenCompose(optTopic -> {
                boolean rejectSubscriptionIfDoesNotExist;
                if (!optTopic.isPresent()) {
                    return FutureUtil.failedFuture((Throwable)new BrokerServiceException.TopicNotFoundException("Topic " + topicName + " does not exist"));
                }
                Topic topic = (Topic)optTopic.get();
                boolean bl = rejectSubscriptionIfDoesNotExist = isDurable && !this.service.isAllowAutoSubscriptionCreation(topicName.toString()) && !topic.getSubscriptions().containsKey((Object)subscriptionName) && topic.isPersistent();
                if (rejectSubscriptionIfDoesNotExist) {
                    return FutureUtil.failedFuture((Throwable)new BrokerServiceException.SubscriptionNotFoundException("Subscription does not exist"));
                }
                long consumerEpoch = -1L;
                if (subscribe.hasConsumerEpoch()) {
                    consumerEpoch = subscribe.getConsumerEpoch();
                }
                SubscriptionOption option = SubscriptionOption.builder().cnx(this).subscriptionName(subscriptionName).consumerId(consumerId).subType(subType).priorityLevel(priorityLevel).consumerName(consumerName).isDurable(isDurable).startMessageId((MessageId)startMessageId).metadata(metadata).readCompacted(readCompacted).initialPosition(initialPosition).startMessageRollbackDurationSec(startMessageRollbackDurationSec).replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta).subscriptionProperties(SubscriptionOption.getPropertiesMap(subscribe.getSubscriptionPropertiesList())).consumerEpoch(consumerEpoch).build();
                if (schema != null) {
                    return topic.addSchemaIfIdleOrCheckCompatible(schema).thenCompose(v -> topic.subscribe(option));
                }
                return topic.subscribe(option);
            })).thenAccept(consumer -> {
                if (consumerFuture.complete(consumer)) {
                    log.info("[{}] Created subscription on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
                    this.commandSender.sendSuccessResponse(requestId);
                    if (this.getBrokerService().getInterceptor() != null) {
                        this.getBrokerService().getInterceptor().consumerCreated(this, (Consumer)consumer, metadata);
                    }
                } else {
                    try {
                        consumer.close();
                        log.info("[{}] Cleared consumer created after timeout on client side {}", (Object)this.remoteAddress, consumer);
                    }
                    catch (BrokerServiceException e) {
                        log.warn("[{}] Error closing consumer created after timeout on client side {}: {}", new Object[]{this.remoteAddress, consumer, e.getMessage()});
                    }
                    this.consumers.remove(consumerId, (Object)consumerFuture);
                }
            })).exceptionally(exception -> {
                if (exception.getCause() instanceof BrokerServiceException.ConsumerBusyException) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}", new Object[]{this.remoteAddress, topicName, subscriptionName, exception.getCause().getMessage()});
                    }
                } else if (exception.getCause() instanceof BrokerServiceException) {
                    log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId, exception.getCause().getMessage()});
                } else {
                    log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId, exception.getCause().getMessage(), exception});
                }
                if (consumerFuture.completeExceptionally((Throwable)exception)) {
                    this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), exception.getCause().getMessage());
                }
                this.consumers.remove(consumerId, (Object)consumerFuture);
                return null;
            });
        } else {
            String msg = "Client is not authorized to subscribe";
            log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.getPrincipal()});
            this.consumers.remove(consumerId, (Object)consumerFuture);
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (ServerError)ServerError.AuthorizationError, (String)msg));
        }
        return null;
    }

    static enum State {
        Start,
        Connected,
        Failed,
        Connecting;

    }
}

