/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessagePayload;
import org.apache.pulsar.client.api.MessagePayloadContext;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionHandler;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerInterceptors;
import org.apache.pulsar.client.impl.ConsumerStatsDisabled;
import org.apache.pulsar.client.impl.ConsumerStatsRecorder;
import org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MessageIdAdvUtils;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MessagePayloadContextImpl;
import org.apache.pulsar.client.impl.MessagePayloadImpl;
import org.apache.pulsar.client.impl.NegativeAcksTracker;
import org.apache.pulsar.client.impl.NonPersistentAcknowledgmentGroupingTracker;
import org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.client.impl.ZeroQueueConsumerImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandMessage;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.EncryptionKeys;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.ComparisonChain;
import org.apache.pulsar.shade.com.google.common.collect.Iterables;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.pulsar.shade.io.netty.util.Recycler;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.io.netty.util.ReferenceCounted;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.io.netty.util.concurrent.Future;
import org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerImpl<T>
extends ConsumerBase<T>
implements ConnectionHandler.Connection {
    private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
    final long consumerId;
    private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConsumerImpl.class, "availablePermits");
    private volatile int availablePermits = 0;
    protected volatile MessageId lastDequeuedMessageId = MessageId.earliest;
    private volatile MessageId lastMessageIdInBroker = MessageId.earliest;
    private final long lookupDeadline;
    private static final AtomicLongFieldUpdater<ConsumerImpl> SUBSCRIBE_DEADLINE_UPDATER = AtomicLongFieldUpdater.newUpdater(ConsumerImpl.class, "subscribeDeadline");
    private volatile long subscribeDeadline = 0L;
    private final int partitionIndex;
    private final boolean hasParentConsumer;
    private final boolean parentConsumerHasListener;
    private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
    private final NegativeAcksTracker negativeAcksTracker;
    protected final ConsumerStatsRecorder stats;
    private final int priorityLevel;
    private final SubscriptionMode subscriptionMode;
    private volatile MessageIdAdv startMessageId;
    private volatile MessageIdAdv seekMessageId;
    private final AtomicBoolean duringSeek;
    private final MessageIdAdv initialStartMessageId;
    private final long startMessageRollbackDurationInSec;
    private volatile boolean hasReachedEndOfTopic;
    private final MessageCrypto msgCrypto;
    private final Map<String, String> metadata;
    private final boolean readCompacted;
    private final boolean resetIncludeHead;
    private final SubscriptionInitialPosition subscriptionInitialPosition;
    private final ConnectionHandler connectionHandler;
    private final TopicName topicName;
    private final String topicNameWithoutPartition;
    private final Map<MessageIdAdv, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;
    private final DeadLetterPolicy deadLetterPolicy;
    private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
    private volatile Producer<byte[]> retryLetterProducer;
    private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
    protected volatile boolean paused;
    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = ConcurrentOpenHashMap.newBuilder().build();
    private int pendingChunkedMessageCount = 0;
    protected long expireTimeOfIncompleteChunkedMessageMillis = 0L;
    private final AtomicBoolean expireChunkMessageTaskScheduled = new AtomicBoolean(false);
    private final int maxPendingChunkedMessage;
    private final boolean autoAckOldestChunkedMessageOnQueueFull;
    private final BlockingQueue<String> pendingChunkedMessageUuidQueue;
    private final boolean createTopicIfDoesNotExist;
    private final boolean poolMessages;
    private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference();
    private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
    private static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new FastThreadLocal<BaseCommand>(){

        @Override
        protected BaseCommand initialValue() throws Exception {
            return new BaseCommand();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

    static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
        return ConsumerImpl.newConsumerImpl(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, false, subscribeFuture, startMessageId, schema, interceptors, createTopicIfDoesNotExist, 0L);
    }

    static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist, long startMessageRollbackDurationInSec) {
        if (conf.getReceiverQueueSize() == 0) {
            return new ZeroQueueConsumerImpl<T>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture, startMessageId, schema, interceptors, createTopicIfDoesNotExist);
        }
        return new ConsumerImpl<T>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, parentConsumerHasListener, subscribeFuture, startMessageId, startMessageRollbackDurationInSec, schema, interceptors, createTopicIfDoesNotExist);
    }

    protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId, long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
        super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema, interceptors);
        this.consumerId = client.newConsumerId();
        TopicName topicName = TopicName.get(topic);
        if (!topicName.isPersistent() && conf.getSubscriptionMode().equals((Object)SubscriptionMode.Durable)) {
            conf.setSubscriptionMode(SubscriptionMode.NonDurable);
            log.warn("[{}] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: {}", (Object)topic, (Object)conf.getSubscriptionName());
        }
        this.subscriptionMode = conf.getSubscriptionMode();
        if (startMessageId != null) {
            MessageIdAdv firstChunkMessageId = ((MessageIdAdv)startMessageId).getFirstChunkMessageId();
            this.startMessageId = firstChunkMessageId == null ? (MessageIdAdv)startMessageId : firstChunkMessageId;
        }
        this.initialStartMessageId = this.startMessageId;
        this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
        this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs();
        this.partitionIndex = partitionIndex;
        this.hasParentConsumer = hasParentConsumer;
        this.parentConsumerHasListener = parentConsumerHasListener;
        this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel();
        this.readCompacted = conf.isReadCompacted();
        this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
        this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
        this.resetIncludeHead = conf.isResetIncludeHead();
        this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
        this.maxPendingChunkedMessage = conf.getMaxPendingChunkedMessage();
        this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue<String>();
        this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
        this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
        this.poolMessages = conf.isPoolMessages();
        this.paused = conf.isStartPaused();
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new ConsumerStatsRecorderImpl(client, conf, this) : ConsumerStatsDisabled.INSTANCE;
        this.duringSeek = new AtomicBoolean(false);
        if (conf.getCryptoKeyReader() != null) {
            if (conf.getMessageCrypto() != null) {
                this.msgCrypto = conf.getMessageCrypto();
            } else {
                MessageCryptoBc msgCryptoBc;
                try {
                    msgCryptoBc = new MessageCryptoBc(String.format("[%s] [%s]", topic, this.subscription), false);
                }
                catch (Exception e) {
                    log.error("MessageCryptoBc may not included in the jar. e:", (Throwable)e);
                    msgCryptoBc = null;
                }
                this.msgCrypto = msgCryptoBc;
            }
        } else {
            this.msgCrypto = null;
        }
        this.metadata = conf.getProperties().isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<String, String>(conf.getProperties()));
        this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder().setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create(), this);
        this.topicName = TopicName.get(topic);
        this.acknowledgmentsGroupingTracker = this.topicName.isPersistent() ? new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup()) : NonPersistentAcknowledgmentGroupingTracker.of();
        if (conf.getDeadLetterPolicy() != null) {
            this.possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<MessageIdAdv, List<MessageImpl<T>>>();
            this.deadLetterPolicy = StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic()) ? DeadLetterPolicy.builder().maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount()).deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic()).build() : DeadLetterPolicy.builder().maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount()).deadLetterTopic(String.format("%s-%s-DLQ", topic, this.subscription)).build();
            if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
                this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic());
            } else {
                this.deadLetterPolicy.setRetryLetterTopic(String.format("%s-%s-RETRY", topic, this.subscription));
            }
            if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getInitialSubscriptionName())) {
                this.deadLetterPolicy.setInitialSubscriptionName(conf.getDeadLetterPolicy().getInitialSubscriptionName());
            }
        } else {
            this.deadLetterPolicy = null;
            this.possibleSendToDeadLetterTopicMessages = null;
        }
        this.topicNameWithoutPartition = topicName.getPartitionedTopicName();
        this.grabCnx();
    }

    public ConnectionHandler getConnectionHandler() {
        return this.connectionHandler;
    }

    @Override
    public UnAckedMessageTracker getUnAckedMessageTracker() {
        return this.unAckedMessageTracker;
    }

    @VisibleForTesting
    NegativeAcksTracker getNegativeAcksTracker() {
        return this.negativeAcksTracker;
    }

    @Override
    public CompletableFuture<Void> unsubscribeAsync(boolean force) {
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
        }
        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<Void>();
        if (this.isConnected()) {
            this.setState(HandlerState.State.Closing);
            long requestId = this.client.newRequestId();
            ByteBuf unsubscribe = Commands.newUnsubscribe(this.consumerId, requestId, force);
            ClientCnx cnx = this.cnx();
            ((CompletableFuture)cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
                this.closeConsumerTasks();
                this.deregisterFromClientCnx();
                this.client.cleanupConsumer(this);
                log.info("[{}][{}] Successfully unsubscribed from topic", (Object)this.topic, (Object)this.subscription);
                this.setState(HandlerState.State.Closed);
                unsubscribeFuture.complete(null);
            })).exceptionally(e -> {
                log.error("[{}][{}] Failed to unsubscribe: {}", new Object[]{this.topic, this.subscription, e.getCause().getMessage()});
                this.setState(HandlerState.State.Ready);
                unsubscribeFuture.completeExceptionally(PulsarClientException.wrap((Throwable)e.getCause(), (String)String.format("Failed to unsubscribe the subscription %s of topic %s", this.subscription, this.topicName.toString())));
                return null;
            });
        } else {
            unsubscribeFuture.completeExceptionally((Throwable)new PulsarClientException.NotConnectedException(String.format("The client is not connected to the broker when unsubscribing the subscription %s of the topic %s", this.subscription, this.topicName.toString())));
        }
        return unsubscribeFuture;
    }

    @Override
    public int minReceiverQueueSize() {
        int size = Math.min(1, this.maxReceiverQueueSize);
        if (this.batchReceivePolicy.getMaxNumMessages() > 0) {
            size = Math.max(size, 2 * this.batchReceivePolicy.getMaxNumMessages() - 2);
        }
        return size;
    }

    @Override
    protected Message<T> internalReceive() throws PulsarClientException {
        try {
            if (this.incomingMessages.isEmpty()) {
                this.expectMoreIncomingMessages();
            }
            Message message = (Message)this.incomingMessages.take();
            this.messageProcessed(message);
            return this.beforeConsume(message);
        }
        catch (InterruptedException e) {
            ExceptionHandler.handleInterruptedException(e);
            this.stats.incrementNumReceiveFailed();
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    @Override
    protected CompletableFuture<Message<T>> internalReceiveAsync() {
        CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
        CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
        this.internalPinnedExecutor.execute(() -> {
            Message message = (Message)this.incomingMessages.poll();
            if (message == null) {
                this.expectMoreIncomingMessages();
                this.pendingReceives.add(result);
                cancellationHandler.setCancelAction(() -> this.pendingReceives.remove(result));
            } else {
                this.messageProcessed(message);
                result.complete(this.beforeConsume(message));
            }
        });
        return result;
    }

    @Override
    protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarClientException {
        try {
            Message message;
            if (this.incomingMessages.isEmpty()) {
                this.expectMoreIncomingMessages();
            }
            if ((message = (Message)this.incomingMessages.poll(timeout, unit)) == null) {
                return null;
            }
            this.messageProcessed(message);
            return this.beforeConsume(message);
        }
        catch (InterruptedException e) {
            ExceptionHandler.handleInterruptedException(e);
            HandlerState.State state = this.getState();
            if (state != HandlerState.State.Closing && state != HandlerState.State.Closed) {
                this.stats.incrementNumReceiveFailed();
                throw PulsarClientException.unwrap((Throwable)e);
            }
            return null;
        }
    }

    @Override
    protected Messages<T> internalBatchReceive() throws PulsarClientException {
        try {
            return this.internalBatchReceiveAsync().get();
        }
        catch (InterruptedException | ExecutionException e) {
            ExceptionHandler.handleInterruptedException(e);
            HandlerState.State state = this.getState();
            if (state != HandlerState.State.Closing && state != HandlerState.State.Closed) {
                this.stats.incrementNumBatchReceiveFailed();
                throw PulsarClientException.unwrap((Throwable)e);
            }
            return null;
        }
    }

    @Override
    protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
        CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
        CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
        this.internalPinnedExecutor.execute(() -> {
            if (this.hasEnoughMessagesForBatchReceive()) {
                this.notifyPendingBatchReceivedCallBack(result);
            } else {
                this.expectMoreIncomingMessages();
                ConsumerBase.OpBatchReceive opBatchReceive = ConsumerBase.OpBatchReceive.of(result);
                this.pendingBatchReceives.add(opBatchReceive);
                this.triggerBatchReceiveTimeoutTask();
                cancellationHandler.setCancelAction(() -> this.pendingBatchReceives.remove(opBatchReceive));
            }
        });
        return result;
    }

    @Override
    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, CommandAck.AckType ackType, Map<String, Long> properties, TransactionImpl txn) {
        if (this.getState() != HandlerState.State.Ready && this.getState() != HandlerState.State.Connecting) {
            this.stats.incrementNumAcksFailed();
            PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + (Object)((Object)this.getState()));
            if (CommandAck.AckType.Individual.equals((Object)ackType)) {
                this.onAcknowledge(messageId, (Throwable)exception);
            } else if (CommandAck.AckType.Cumulative.equals((Object)ackType)) {
                this.onAcknowledgeCumulative(messageId, (Throwable)exception);
            }
            return FutureUtil.failedFuture(exception);
        }
        if (txn != null) {
            return this.doTransactionAcknowledgeForResponse(messageId, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
        }
        return this.acknowledgmentsGroupingTracker.addAcknowledgment(messageId, ackType, properties);
    }

    @Override
    protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, CommandAck.AckType ackType, Map<String, Long> properties, TransactionImpl txn) {
        if (this.getState() != HandlerState.State.Ready && this.getState() != HandlerState.State.Connecting) {
            this.stats.incrementNumAcksFailed();
            PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + (Object)((Object)this.getState()));
            if (CommandAck.AckType.Individual.equals((Object)ackType)) {
                this.onAcknowledge(messageIdList, (Throwable)exception);
            } else if (CommandAck.AckType.Cumulative.equals((Object)ackType)) {
                this.onAcknowledgeCumulative(messageIdList, (Throwable)exception);
            }
            return FutureUtil.failedFuture(exception);
        }
        if (txn != null) {
            return this.doTransactionAcknowledgeForResponse(messageIdList, ackType, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
        }
        return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected CompletableFuture<Void> doReconsumeLater(Message<?> message, CommandAck.AckType ackType, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
        MessageId messageId = message.getMessageId();
        if (messageId == null) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId"));
        }
        if (this.getState() != HandlerState.State.Ready && this.getState() != HandlerState.State.Connecting) {
            this.stats.incrementNumAcksFailed();
            PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + (Object)((Object)this.getState()));
            if (CommandAck.AckType.Individual.equals((Object)ackType)) {
                this.onAcknowledge(messageId, (Throwable)exception);
            } else if (CommandAck.AckType.Cumulative.equals((Object)ackType)) {
                this.onAcknowledgeCumulative(messageId, (Throwable)exception);
            }
            return FutureUtil.failedFuture(exception);
        }
        if (delayTime < 0L) {
            delayTime = 0L;
        }
        if (this.retryLetterProducer == null) {
            this.createProducerLock.writeLock().lock();
            try {
                if (this.retryLetterProducer == null) {
                    this.retryLetterProducer = this.client.newProducer(Schema.AUTO_PRODUCE_BYTES((Schema)this.schema)).topic(this.deadLetterPolicy.getRetryLetterTopic()).enableBatching(false).enableChunking(true).blockIfQueueFull(false).create();
                    this.stats.setRetryLetterProducerStats(this.retryLetterProducer.getStats());
                }
            }
            catch (Exception e) {
                log.error("Create retry letter producer exception with topic: {}", (Object)this.deadLetterPolicy.getRetryLetterTopic(), (Object)e);
                CompletableFuture<Void> completableFuture = FutureUtil.failedFuture(e);
                return completableFuture;
            }
            finally {
                this.createProducerLock.writeLock().unlock();
            }
        }
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        if (this.retryLetterProducer != null) {
            try {
                MessageImpl<?> retryMessage = this.getMessageImpl(message);
                String originMessageIdStr = message.getMessageId().toString();
                String originTopicNameStr = this.getOriginTopicNameStr(message);
                SortedMap<String, String> propertiesMap = this.getPropertiesMap(message, originMessageIdStr, originTopicNameStr);
                if (customProperties != null) {
                    propertiesMap.putAll(customProperties);
                }
                int reconsumeTimes = 1;
                if (propertiesMap.containsKey("RECONSUMETIMES")) {
                    reconsumeTimes = Integer.parseInt((String)propertiesMap.get("RECONSUMETIMES"));
                    ++reconsumeTimes;
                }
                propertiesMap.put("RECONSUMETIMES", String.valueOf(reconsumeTimes));
                propertiesMap.put("DELAY_TIME", String.valueOf(unit.toMillis(delayTime)));
                MessageId finalMessageId = messageId;
                if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(this.deadLetterPolicy.getDeadLetterTopic())) {
                    this.initDeadLetterProducerIfNeeded();
                    ((CompletableFuture)this.deadLetterProducer.thenAcceptAsync(dlqProducer -> {
                        TypedMessageBuilder typedMessageBuilderNew = dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get())).value((Object)retryMessage.getData()).properties((Map)propertiesMap);
                        ((CompletableFuture)typedMessageBuilderNew.sendAsync().thenAccept(msgId -> ((CompletableFuture)this.doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> result.complete(null))).exceptionally(ex -> {
                            result.completeExceptionally((Throwable)ex);
                            return null;
                        }))).exceptionally(ex -> {
                            result.completeExceptionally((Throwable)ex);
                            return null;
                        });
                    }, (Executor)this.internalPinnedExecutor)).exceptionally(ex -> {
                        result.completeExceptionally((Throwable)ex);
                        this.deadLetterProducer = null;
                        return null;
                    });
                } else {
                    assert (retryMessage != null);
                    TypedMessageBuilder typedMessageBuilderNew = this.retryLetterProducer.newMessage(Schema.AUTO_PRODUCE_BYTES((Schema)((Schema)message.getReaderSchema().get()))).value((Object)retryMessage.getData()).properties(propertiesMap);
                    if (delayTime > 0L) {
                        typedMessageBuilderNew.deliverAfter(delayTime, unit);
                    }
                    if (message.hasKey()) {
                        typedMessageBuilderNew.key(message.getKey());
                    }
                    ((CompletableFuture)((CompletableFuture)typedMessageBuilderNew.sendAsync().thenCompose(__ -> this.doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))).thenAccept(v -> result.complete(null))).exceptionally(ex -> {
                        result.completeExceptionally((Throwable)ex);
                        return null;
                    });
                }
            }
            catch (Exception e) {
                result.completeExceptionally(e);
            }
        }
        MessageId finalMessageId = messageId;
        result.exceptionally(ex -> {
            log.error("Send to retry letter topic exception with topic: {}, messageId: {}", new Object[]{this.retryLetterProducer.getTopic(), finalMessageId, ex});
            Set<MessageId> messageIds = Collections.singleton(finalMessageId);
            this.unAckedMessageTracker.remove(finalMessageId);
            this.redeliverUnacknowledgedMessages(messageIds);
            return null;
        });
        return result;
    }

    private SortedMap<String, String> getPropertiesMap(Message<?> message, String originMessageIdStr, String originTopicNameStr) {
        TreeMap<String, String> propertiesMap = new TreeMap<String, String>();
        if (message.getProperties() != null) {
            propertiesMap.putAll(message.getProperties());
        }
        propertiesMap.putIfAbsent("REAL_TOPIC", originTopicNameStr);
        propertiesMap.putIfAbsent("ORIGIN_MESSAGE_IDY_TIME", originMessageIdStr);
        propertiesMap.putIfAbsent("ORIGIN_MESSAGE_ID", originMessageIdStr);
        propertiesMap.putIfAbsent("REAL_SUBSCRIPTION", this.subscription);
        return propertiesMap;
    }

    private String getOriginTopicNameStr(Message<?> message) {
        MessageId messageId = message.getMessageId();
        if (messageId instanceof TopicMessageId) {
            String topic = ((TopicMessageId)messageId).getOwnerTopic();
            int index = topic.lastIndexOf("-partition-");
            if (index < 0) {
                return topic;
            }
            return topic.substring(0, index);
        }
        return message.getTopicName();
    }

    private MessageImpl<?> getMessageImpl(Message<?> message) {
        if (message instanceof TopicMessageImpl) {
            return (MessageImpl)((TopicMessageImpl)message).getMessage();
        }
        if (message instanceof MessageImpl) {
            return (MessageImpl)message;
        }
        return null;
    }

    public void negativeAcknowledge(MessageId messageId) {
        this.negativeAcksTracker.add(messageId);
        this.unAckedMessageTracker.remove(messageId);
    }

    @Override
    public void negativeAcknowledge(Message<?> message) {
        this.negativeAcksTracker.add(message);
        this.unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
        int currentSize;
        this.previousExceptions.clear();
        HandlerState.State state = this.getState();
        if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) {
            this.setState(HandlerState.State.Closed);
            this.closeConsumerTasks();
            this.deregisterFromClientCnx();
            this.client.cleanupConsumer(this);
            this.clearReceiverQueue();
            return CompletableFuture.completedFuture(null);
        }
        log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}", new Object[]{this.topic, this.subscription, cnx.ctx().channel(), this.consumerId});
        long requestId = this.client.newRequestId();
        if (this.duringSeek.get()) {
            this.acknowledgmentsGroupingTracker.flushAndClean();
        }
        SUBSCRIBE_DEADLINE_UPDATER.compareAndSet(this, 0L, System.currentTimeMillis() + this.client.getConfiguration().getOperationTimeoutMs());
        ConsumerImpl consumerImpl = this;
        synchronized (consumerImpl) {
            currentSize = this.incomingMessages.size();
            this.startMessageId = this.clearReceiverQueue();
            if (this.possibleSendToDeadLetterTopicMessages != null) {
                this.possibleSendToDeadLetterTopicMessages.clear();
            }
        }
        boolean isDurable = this.subscriptionMode == SubscriptionMode.Durable;
        MessageIdData startMessageIdData = !isDurable && this.startMessageId != null ? new MessageIdData().setLedgerId(this.startMessageId.getLedgerId()).setEntryId(this.startMessageId.getEntryId()).setBatchIndex(this.startMessageId.getBatchIndex()) : null;
        SchemaInfo si = this.schema.getSchemaInfo();
        if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
            si = null;
        } else if (this.schema instanceof AutoConsumeSchema && Commands.peerSupportsCarryAutoConsumeSchemaToBroker(cnx.getRemoteEndpointProtocolVersion())) {
            si = AutoConsumeSchema.SCHEMA_INFO;
        }
        long startMessageRollbackDuration = this.startMessageRollbackDurationInSec > 0L && this.startMessageId != null && this.startMessageId.equals(this.initialStartMessageId) ? this.startMessageRollbackDurationInSec : 0L;
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ConsumerImpl consumerImpl2 = this;
        synchronized (consumerImpl2) {
            this.setClientCnx(cnx);
            ByteBuf request = Commands.newSubscribe(this.topic, this.subscription, this.consumerId, requestId, this.getSubType(), this.priorityLevel, this.consumerName, isDurable, startMessageIdData, this.metadata, this.readCompacted, this.conf.isReplicateSubscriptionState(), CommandSubscribe.InitialPosition.valueOf(this.subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, this.createTopicIfDoesNotExist, this.conf.getKeySharedPolicy(), this.conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
            ((CompletableFuture)cnx.sendRequestWithId(request, requestId).thenRun(() -> {
                ConsumerImpl consumerImpl = this;
                synchronized (consumerImpl) {
                    if (!this.changeToReadyState()) {
                        this.setState(HandlerState.State.Closed);
                        this.deregisterFromClientCnx();
                        this.client.cleanupConsumer(this);
                        cnx.channel().close();
                        future.complete(null);
                        return;
                    }
                    this.consumerIsReconnectedToBroker(cnx, currentSize);
                }
                this.resetBackoff();
                boolean firstTimeConnect = this.subscribeFuture.complete(this);
                if (!(firstTimeConnect && this.hasParentConsumer || this.getCurrentReceiverQueueSize() == 0)) {
                    this.increaseAvailablePermits(cnx, this.getCurrentReceiverQueueSize());
                }
                future.complete(null);
            })).exceptionally(e -> {
                this.deregisterFromClientCnx();
                if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                    cnx.channel().close();
                    future.complete(null);
                    return null;
                }
                log.warn("[{}][{}] Failed to subscribe to topic on {}", new Object[]{this.topic, this.subscription, cnx.channel().remoteAddress()});
                if (e.getCause() instanceof PulsarClientException.TimeoutException) {
                    long closeRequestId = this.client.newRequestId();
                    ByteBuf cmd = Commands.newCloseConsumer(this.consumerId, closeRequestId, null, null);
                    cnx.sendRequestWithId(cmd, closeRequestId);
                }
                if (e.getCause() instanceof PulsarClientException && PulsarClientException.isRetriableError((Throwable)e.getCause()) && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(this)) {
                    future.completeExceptionally(e.getCause());
                } else if (!this.subscribeFuture.isDone()) {
                    this.setState(HandlerState.State.Failed);
                    this.closeConsumerTasks();
                    this.subscribeFuture.completeExceptionally(PulsarClientException.wrap((Throwable)e, (String)String.format("Failed to subscribe the topic %s with subscription name %s when connecting to the broker", this.topicName.toString(), this.subscription)));
                    this.client.cleanupConsumer(this);
                } else if (e.getCause() instanceof PulsarClientException.TopicDoesNotExistException) {
                    this.setState(HandlerState.State.Failed);
                    this.closeConsumerTasks();
                    this.client.cleanupConsumer(this);
                    log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", new Object[]{this.topic, this.subscription, cnx.channel().remoteAddress()});
                } else {
                    future.completeExceptionally(e.getCause());
                }
                if (!future.isDone()) {
                    future.complete(null);
                }
                return null;
            });
        }
        return future;
    }

    protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) {
        log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", new Object[]{this.topic, this.subscription, cnx.channel().remoteAddress(), this.consumerId});
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
    }

    private MessageIdAdv clearReceiverQueue() {
        ArrayList currentMessageQueue = new ArrayList(this.incomingMessages.size());
        this.incomingMessages.drainTo(currentMessageQueue);
        this.resetIncomingMessageSize();
        if (this.duringSeek.compareAndSet(true, false)) {
            return this.seekMessageId;
        }
        if (this.subscriptionMode == SubscriptionMode.Durable) {
            return this.startMessageId;
        }
        if (!currentMessageQueue.isEmpty()) {
            MessageIdAdv nextMessageInQueue = (MessageIdAdv)((Message)currentMessageQueue.get(0)).getMessageId();
            MessageIdAdv previousMessage = MessageIdAdvUtils.isBatch(nextMessageInQueue) ? new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(), nextMessageInQueue.getEntryId(), nextMessageInQueue.getPartitionIndex(), nextMessageInQueue.getBatchIndex() - 1) : MessageIdAdvUtils.prevMessageId(nextMessageInQueue);
            currentMessageQueue.forEach(Message::release);
            return previousMessage;
        }
        if (!this.lastDequeuedMessageId.equals(MessageId.earliest)) {
            return new BatchMessageIdImpl((MessageIdImpl)this.lastDequeuedMessageId);
        }
        return this.startMessageId;
    }

    private void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) {
        if (cnx != null && numMessages > 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Adding {} additional permits", new Object[]{this.topic, this.subscription, numMessages});
            }
            if (log.isDebugEnabled()) {
                cnx.ctx().writeAndFlush(Commands.newFlow(this.consumerId, numMessages)).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)writeFuture -> {
                    if (!writeFuture.isSuccess()) {
                        log.debug("Consumer {} failed to send {} permits to broker: {}", new Object[]{this.consumerId, numMessages, writeFuture.cause().getMessage()});
                    } else {
                        log.debug("Consumer {} sent {} permits to broker", (Object)this.consumerId, (Object)numMessages);
                    }
                }));
            } else {
                cnx.ctx().writeAndFlush(Commands.newFlow(this.consumerId, numMessages), cnx.ctx().voidPromise());
            }
        }
    }

    @Override
    public void connectionFailed(PulsarClientException exception) {
        boolean timeout;
        boolean nonRetriableError = !PulsarClientException.isRetriableError((Throwable)exception);
        boolean bl = timeout = System.currentTimeMillis() > this.lookupDeadline;
        if (nonRetriableError || timeout) {
            exception.setPreviousExceptions(this.previousExceptions);
            if (this.subscribeFuture.completeExceptionally(exception)) {
                this.setState(HandlerState.State.Failed);
                if (nonRetriableError) {
                    log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}", new Object[]{this.topic, this.consumerId, exception.getMessage()});
                } else {
                    log.info("[{}] Consumer creation failed for consumer {} after timeout", (Object)this.topic, (Object)this.consumerId);
                }
                this.closeConsumerTasks();
                this.deregisterFromClientCnx();
                this.client.cleanupConsumer(this);
            }
        } else {
            this.previousExceptions.add(exception);
        }
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            this.closeConsumerTasks();
            this.failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
            return closeFuture;
        }
        if (!this.isConnected()) {
            log.info("[{}] [{}] Closed Consumer (not connected)", (Object)this.topic, (Object)this.subscription);
            this.setState(HandlerState.State.Closed);
            this.closeConsumerTasks();
            this.deregisterFromClientCnx();
            this.client.cleanupConsumer(this);
            this.failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
            return closeFuture;
        }
        this.stats.getStatTimeout().ifPresent(Timeout::cancel);
        this.setState(HandlerState.State.Closing);
        this.closeConsumerTasks();
        long requestId = this.client.newRequestId();
        ClientCnx cnx = this.cnx();
        if (null == cnx) {
            this.cleanupAtClose(closeFuture, null);
        } else {
            ByteBuf cmd = Commands.newCloseConsumer(this.consumerId, requestId, null, null);
            cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
                boolean ignoreException;
                ChannelHandlerContext ctx = cnx.ctx();
                boolean bl = ignoreException = ctx == null || !ctx.channel().isActive();
                if (ignoreException && exception != null) {
                    log.debug("Exception ignored in closing consumer", exception);
                }
                this.cleanupAtClose(closeFuture, ignoreException ? null : exception);
                return null;
            });
        }
        ArrayList<CompletionStage> closeFutures = new ArrayList<CompletionStage>(4);
        closeFutures.add(closeFuture);
        if (this.retryLetterProducer != null) {
            closeFutures.add(this.retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> {
                if (ex != null) {
                    log.warn("Exception ignored in closing retryLetterProducer of consumer", ex);
                }
            }));
        }
        if (this.deadLetterProducer != null) {
            closeFutures.add(((CompletableFuture)this.deadLetterProducer.thenCompose(p -> p.closeAsync())).whenComplete((ignore, ex) -> {
                if (ex != null) {
                    log.warn("Exception ignored in closing deadLetterProducer of consumer", ex);
                }
            }));
        }
        return FutureUtil.waitForAll(closeFutures);
    }

    private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable exception) {
        log.info("[{}] [{}] Closed consumer", (Object)this.topic, (Object)this.subscription);
        this.setState(HandlerState.State.Closed);
        this.closeConsumerTasks();
        this.deregisterFromClientCnx();
        this.client.cleanupConsumer(this);
        this.failPendingReceive().whenComplete((r, t) -> {
            if (exception != null) {
                closeFuture.completeExceptionally(exception);
            } else {
                closeFuture.complete(null);
            }
        });
    }

    private void closeConsumerTasks() {
        this.unAckedMessageTracker.close();
        if (this.possibleSendToDeadLetterTopicMessages != null) {
            this.possibleSendToDeadLetterTopicMessages.clear();
        }
        this.acknowledgmentsGroupingTracker.close();
        if (this.batchReceiveTimeout != null) {
            this.batchReceiveTimeout.cancel();
        }
        this.negativeAcksTracker.close();
        this.stats.getStatTimeout().ifPresent(Timeout::cancel);
        if (this.poolMessages) {
            this.releasePooledMessagesAndStopAcceptNew();
        }
    }

    private void releasePooledMessagesAndStopAcceptNew() {
        this.incomingMessages.terminate(message -> message.release());
        this.clearIncomingMessages();
    }

    void activeConsumerChanged(boolean isActive) {
        if (this.consumerEventListener == null) {
            return;
        }
        this.externalPinnedExecutor.execute(() -> {
            if (isActive) {
                this.consumerEventListener.becameActive((Consumer)this, this.partitionIndex);
            } else {
                this.consumerEventListener.becameInactive((Consumer)this, this.partitionIndex);
            }
        });
    }

    protected boolean isBatch(MessageMetadata messageMetadata) {
        return !this.isMessageUndecryptable(messageMetadata) && (messageMetadata.hasNumMessagesInBatch() || messageMetadata.getNumMessagesInBatch() != 1);
    }

    protected <V> MessageImpl<V> newSingleMessage(int index, int numMessages, BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload, MessageIdImpl messageId, Schema<V> schema, boolean containMetadata, BitSetRecyclable ackBitSet, BitSet ackSetInMessageId, int redeliveryCount, long consumerEpoch) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] processing message num - {} in batch", new Object[]{this.subscription, this.consumerName, index});
        }
        ReferenceCounted singleMessagePayload = null;
        try {
            if (containMetadata) {
                singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, index, numMessages);
            }
            if (this.topicName.isPersistent() && this.isSameEntry(messageId) && this.isPriorBatchIndex(index)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", new Object[]{this.subscription, this.consumerName, this.startMessageId});
                }
                MessageImpl<V> messageImpl = null;
                return messageImpl;
            }
            if (singleMessageMetadata != null && singleMessageMetadata.isCompactedOut()) {
                MessageImpl<V> messageImpl = null;
                return messageImpl;
            }
            if (ackBitSet != null && !ackBitSet.get(index)) {
                MessageImpl<V> messageImpl = null;
                return messageImpl;
            }
            BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), this.getPartitionIndex(), index, numMessages, ackSetInMessageId);
            ReferenceCounted payloadBuffer = singleMessagePayload != null ? singleMessagePayload : payload;
            MessageImpl<V> message = MessageImpl.create(this.topicName.toString(), batchMessageIdImpl, msgMetadata, singleMessageMetadata, (ByteBuf)payloadBuffer, this.createEncryptionContext(msgMetadata), this.cnx(), schema, redeliveryCount, this.poolMessages, consumerEpoch);
            message.setBrokerEntryMetadata(brokerEntryMetadata);
            MessageImpl<V> messageImpl = message;
            return messageImpl;
        }
        catch (IOException | IllegalStateException e) {
            throw new IllegalStateException(e);
        }
        finally {
            if (singleMessagePayload != null) {
                singleMessagePayload.release();
            }
        }
    }

    protected <V> MessageImpl<V> newMessage(MessageIdImpl messageId, BrokerEntryMetadata brokerEntryMetadata, MessageMetadata messageMetadata, ByteBuf payload, Schema<V> schema, int redeliveryCount, long consumerEpoch) {
        MessageImpl<V> message = MessageImpl.create(this.topicName.toString(), messageId, messageMetadata, payload, this.createEncryptionContext(messageMetadata), this.cnx(), schema, redeliveryCount, this.poolMessages, consumerEpoch);
        message.setBrokerEntryMetadata(brokerEntryMetadata);
        return message;
    }

    private void executeNotifyCallback(MessageImpl<T> message) {
        this.internalPinnedExecutor.execute(() -> {
            if (!this.isValidConsumerEpoch(message)) {
                this.increaseAvailablePermits(this.cnx());
                return;
            }
            if (this.hasNextPendingReceive()) {
                this.notifyPendingReceivedCallback(message, null);
            } else if (this.enqueueMessageAndCheckBatchReceive(message) && this.hasPendingBatchReceive()) {
                this.notifyPendingBatchReceivedCallBack();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPayloadByProcessor(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata messageMetadata, ByteBuf byteBuf, MessageIdImpl messageId, Schema<T> schema, int redeliveryCount, List<Long> ackSet, long consumerEpoch) {
        MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
        MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get(brokerEntryMetadata, messageMetadata, messageId, this, redeliveryCount, ackSet, consumerEpoch);
        AtomicInteger skippedMessages = new AtomicInteger(0);
        try {
            this.conf.getPayloadProcessor().process((MessagePayload)payload, (MessagePayloadContext)entryContext, schema, message -> {
                if (message != null) {
                    this.executeNotifyCallback((MessageImpl)message);
                } else {
                    skippedMessages.incrementAndGet();
                }
            });
        }
        catch (Throwable throwable) {
            log.warn("[{}] [{}] unable to obtain message in batch", new Object[]{this.subscription, this.consumerName, throwable});
            this.discardCorruptedMessage(messageId, this.cnx(), CommandAck.ValidationError.BatchDeSerializeError);
        }
        finally {
            entryContext.recycle();
            payload.release();
        }
        if (skippedMessages.get() > 0) {
            this.increaseAvailablePermits(this.cnx(), skippedMessages.get());
        }
        this.tryTriggerListener();
    }

    void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, ClientCnx cnx) {
        MessageMetadata msgMetadata;
        BrokerEntryMetadata brokerEntryMetadata;
        List<Long> ackSet = Collections.emptyList();
        if (cmdMessage.getAckSetsCount() > 0) {
            ackSet = new ArrayList(cmdMessage.getAckSetsCount());
            for (int i = 0; i < cmdMessage.getAckSetsCount(); ++i) {
                ackSet.add(cmdMessage.getAckSetAt(i));
            }
        }
        int redeliveryCount = cmdMessage.getRedeliveryCount();
        MessageIdData messageId = cmdMessage.getMessageId();
        long consumerEpoch = -1L;
        if (cmdMessage.hasConsumerEpoch()) {
            consumerEpoch = cmdMessage.getConsumerEpoch();
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received message: {}/{}", new Object[]{this.topic, this.subscription, messageId.getLedgerId(), messageId.getEntryId()});
        }
        if (!this.verifyChecksum(headersAndPayload, messageId)) {
            this.discardCorruptedMessage(messageId, cnx, CommandAck.ValidationError.ChecksumMismatch);
            return;
        }
        try {
            brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayload);
            msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
        }
        catch (Throwable t) {
            this.discardCorruptedMessage(messageId, cnx, CommandAck.ValidationError.ChecksumMismatch);
            return;
        }
        int numMessages = msgMetadata.getNumMessagesInBatch();
        int numChunks = msgMetadata.hasNumChunksFromMsg() ? msgMetadata.getNumChunksFromMsg() : 0;
        boolean isChunkedMessage = numChunks > 1;
        MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), this.getPartitionIndex());
        if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch() && this.acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}", new Object[]{this.topic, this.subscription, this.consumerName, msgId});
            }
            this.increaseAvailablePermits(cnx, numMessages);
            return;
        }
        ByteBuf decryptedPayload = this.decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload, cnx);
        boolean isMessageUndecryptable = this.isMessageUndecryptable(msgMetadata);
        if (decryptedPayload == null) {
            return;
        }
        ByteBuf uncompressedPayload = isMessageUndecryptable || isChunkedMessage ? decryptedPayload.retain() : this.uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx, true);
        decryptedPayload.release();
        if (uncompressedPayload == null) {
            return;
        }
        if (this.conf.getPayloadProcessor() != null) {
            this.processPayloadByProcessor(brokerEntryMetadata, msgMetadata, uncompressedPayload, msgId, this.schema, redeliveryCount, ackSet, consumerEpoch);
            return;
        }
        if (isMessageUndecryptable || numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
            if (isChunkedMessage) {
                if ((uncompressedPayload = this.processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx)) == null) {
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}", new Object[]{msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId, msgMetadata.getSequenceId()});
                }
                ChunkedMessageCtx chunkedMsgCtx = this.chunkedMessagesMap.remove(msgMetadata.getUuid());
                if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
                    msgId = new ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0], chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
                }
                this.unAckedChunkedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
                --this.pendingChunkedMessageCount;
                chunkedMsgCtx.recycle();
            }
            if (this.topicName.isPersistent() && this.isSameEntry(msgId) && this.isPriorEntryIndex(messageId.getEntryId())) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", new Object[]{this.subscription, this.consumerName, this.startMessageId});
                }
                uncompressedPayload.release();
                return;
            }
            MessageImpl message = this.newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload, this.schema, redeliveryCount, consumerEpoch);
            uncompressedPayload.release();
            if (this.deadLetterPolicy != null && this.possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= this.deadLetterPolicy.getMaxRedeliverCount()) {
                this.possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
                if (redeliveryCount > this.deadLetterPolicy.getMaxRedeliverCount()) {
                    this.redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
                    this.increaseAvailablePermits(cnx);
                    return;
                }
            }
            this.executeNotifyCallback(message);
        } else {
            this.receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx, consumerEpoch);
            uncompressedPayload.release();
        }
        this.tryTriggerListener();
    }

    private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId, MessageIdData messageId, ClientCnx cnx) {
        if (msgMetadata.getChunkId() != msgMetadata.getNumChunksFromMsg() - 1) {
            this.increaseAvailablePermits(cnx);
        }
        if (this.expireTimeOfIncompleteChunkedMessageMillis > 0L && this.expireChunkMessageTaskScheduled.compareAndSet(false, true)) {
            ((ScheduledExecutorService)this.client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(() -> this.internalPinnedExecutor.execute(Runnables.catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)), this.expireTimeOfIncompleteChunkedMessageMillis, this.expireTimeOfIncompleteChunkedMessageMillis, TimeUnit.MILLISECONDS);
        }
        ChunkedMessageCtx chunkedMsgCtx = this.chunkedMessagesMap.get(msgMetadata.getUuid());
        if (msgMetadata.getChunkId() == 0) {
            if (chunkedMsgCtx != null) {
                boolean isCorruptedChunkMessageDetected = Arrays.stream(chunkedMsgCtx.chunkedMessageIds).noneMatch(messageId1 -> messageId1 != null && messageId1.ledgerId == messageId.getLedgerId() && messageId1.entryId == messageId.getEntryId());
                if (isCorruptedChunkMessageDetected) {
                    Arrays.stream(chunkedMsgCtx.chunkedMessageIds).forEach(messageId1 -> {
                        if (messageId1 != null) {
                            this.doAcknowledge((MessageId)messageId1, CommandAck.AckType.Individual, Collections.emptyMap(), null);
                        }
                    });
                }
                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
                    ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
                }
                chunkedMsgCtx.recycle();
                this.chunkedMessagesMap.remove(msgMetadata.getUuid());
            }
            ++this.pendingChunkedMessageCount;
            if (this.maxPendingChunkedMessage > 0 && this.pendingChunkedMessageCount > this.maxPendingChunkedMessage) {
                this.removeOldestPendingChunkedMessage();
            }
            int totalChunks = msgMetadata.getNumChunksFromMsg();
            ByteBuf chunkedMsgBuffer = PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(), msgMetadata.getTotalChunkMsgSize());
            chunkedMsgCtx = this.chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(), key -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer));
            this.pendingChunkedMessageUuidQueue.add(msgMetadata.getUuid());
        }
        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null || msgMetadata.getChunkId() != chunkedMsgCtx.lastChunkedMessageId + 1) {
            if (chunkedMsgCtx != null && msgMetadata.getChunkId() <= chunkedMsgCtx.lastChunkedMessageId) {
                log.warn("[{}] Receive a duplicated chunk message with messageId [{}], last-chunk-Id [{}], chunkId [{}], sequenceId [{}]", new Object[]{msgMetadata.getProducerName(), msgId, chunkedMsgCtx.lastChunkedMessageId, msgMetadata.getChunkId(), msgMetadata.getSequenceId()});
                compressedPayload.release();
                boolean isDuplicatedChunk = Arrays.stream(chunkedMsgCtx.chunkedMessageIds).noneMatch(messageId1 -> messageId1 != null && messageId1.ledgerId == messageId.getLedgerId() && messageId1.entryId == messageId.getEntryId());
                if (isDuplicatedChunk) {
                    this.doAcknowledge(msgId, CommandAck.AckType.Individual, Collections.emptyMap(), null);
                }
                return null;
            }
            log.info("[{}] [{}] Received unexpected chunk messageId {}, last-chunk-id = {}, chunkId = {}, uuid = {}", new Object[]{this.topic, this.subscription, msgId, chunkedMsgCtx != null ? Integer.valueOf(chunkedMsgCtx.lastChunkedMessageId) : null, msgMetadata.getChunkId(), msgMetadata.getUuid()});
            if (chunkedMsgCtx != null) {
                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
                    ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
                }
                chunkedMsgCtx.recycle();
            }
            this.chunkedMessagesMap.remove(msgMetadata.getUuid());
            compressedPayload.release();
            if (this.expireTimeOfIncompleteChunkedMessageMillis > 0L && System.currentTimeMillis() > msgMetadata.getPublishTime() + this.expireTimeOfIncompleteChunkedMessageMillis) {
                this.doAcknowledge(msgId, CommandAck.AckType.Individual, Collections.emptyMap(), null);
            } else {
                this.trackMessage(msgId);
            }
            return null;
        }
        chunkedMsgCtx.chunkedMessageIds[msgMetadata.getChunkId()] = msgId;
        chunkedMsgCtx.chunkedMsgBuffer.writeBytes(compressedPayload);
        chunkedMsgCtx.lastChunkedMessageId = msgMetadata.getChunkId();
        if (msgMetadata.getChunkId() != msgMetadata.getNumChunksFromMsg() - 1) {
            compressedPayload.release();
            return null;
        }
        compressedPayload.release();
        compressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
        ByteBuf uncompressedPayload = this.uncompressPayloadIfNeeded(messageId, msgMetadata, compressedPayload, cnx, false);
        compressedPayload.release();
        return uncompressedPayload;
    }

    void notifyPendingReceivedCallback(Message<T> message, Exception exception) {
        if (this.pendingReceives.isEmpty()) {
            return;
        }
        CompletableFuture receivedFuture = this.nextPendingReceive();
        if (receivedFuture == null) {
            return;
        }
        if (exception != null) {
            this.internalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
            return;
        }
        if (message == null) {
            IllegalStateException e = new IllegalStateException("received message can't be null");
            this.internalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e));
            return;
        }
        if (this.getCurrentReceiverQueueSize() == 0) {
            this.trackMessage(message);
            this.interceptAndComplete(message, receivedFuture);
            return;
        }
        this.messageProcessed(message);
        this.interceptAndComplete(message, receivedFuture);
    }

    private void interceptAndComplete(Message<T> message, CompletableFuture<Message<T>> receivedFuture) {
        Message<T> interceptMessage = this.beforeConsume(message);
        this.completePendingReceive(receivedFuture, interceptMessage);
    }

    void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata, int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx, long consumerEpoch) {
        int batchSize = msgMetadata.getNumMessagesInBatch();
        MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), this.getPartitionIndex());
        ArrayList possibleToDeadLetter = null;
        if (this.deadLetterPolicy != null && redeliveryCount >= this.deadLetterPolicy.getMaxRedeliverCount()) {
            possibleToDeadLetter = new ArrayList();
        }
        BitSet ackSetInMessageId = BatchMessageIdImpl.newAckSet(batchSize);
        BitSetRecyclable ackBitSet = null;
        if (ackSet != null && ackSet.size() > 0) {
            ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
        }
        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
        int skippedMessages = 0;
        try {
            for (int i = 0; i < batchSize; ++i) {
                MessageImpl message = this.newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata, singleMessageMetadata, uncompressedPayload, batchMessage, this.schema, true, ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch);
                if (message == null) {
                    ++skippedMessages;
                    continue;
                }
                if (possibleToDeadLetter != null) {
                    possibleToDeadLetter.add(message);
                    if (redeliveryCount > this.deadLetterPolicy.getMaxRedeliverCount()) {
                        ++skippedMessages;
                        continue;
                    }
                }
                if (this.acknowledgmentsGroupingTracker.isDuplicate(message.getMessageId())) {
                    ++skippedMessages;
                    continue;
                }
                this.executeNotifyCallback(message);
            }
            if (ackBitSet != null) {
                ackBitSet.recycle();
            }
        }
        catch (IllegalStateException e) {
            log.warn("[{}] [{}] unable to obtain message in batch", new Object[]{this.subscription, this.consumerName, e});
            this.discardCorruptedMessage(messageId, cnx, CommandAck.ValidationError.BatchDeSerializeError);
        }
        if (this.deadLetterPolicy != null && this.possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= this.deadLetterPolicy.getMaxRedeliverCount()) {
            this.possibleSendToDeadLetterTopicMessages.put(batchMessage, possibleToDeadLetter);
            if (redeliveryCount > this.deadLetterPolicy.getMaxRedeliverCount()) {
                this.redeliverUnacknowledgedMessages(Collections.singleton(batchMessage));
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", new Object[]{this.subscription, this.consumerName, this.incomingMessages.size(), this.incomingMessages.remainingCapacity()});
        }
        if (skippedMessages > 0) {
            this.increaseAvailablePermits(cnx, skippedMessages);
        }
    }

    private boolean isPriorEntryIndex(long idx) {
        return this.resetIncludeHead ? idx < this.startMessageId.getEntryId() : idx <= this.startMessageId.getEntryId();
    }

    private boolean isPriorBatchIndex(long idx) {
        return this.resetIncludeHead ? idx < (long)this.startMessageId.getBatchIndex() : idx <= (long)this.startMessageId.getBatchIndex();
    }

    private boolean isSameEntry(MessageIdImpl messageId) {
        return this.startMessageId != null && messageId.getLedgerId() == this.startMessageId.getLedgerId() && messageId.getEntryId() == this.startMessageId.getEntryId();
    }

    @Override
    protected synchronized void messageProcessed(Message<?> msg) {
        ClientCnx currentCnx = this.cnx();
        ClientCnx msgCnx = ((MessageImpl)msg).getCnx();
        this.lastDequeuedMessageId = msg.getMessageId();
        if (msgCnx == currentCnx) {
            if (this.listener == null && !this.parentConsumerHasListener) {
                this.increaseAvailablePermits(currentCnx);
            }
            this.stats.updateNumMsgsReceived(msg);
            this.trackMessage(msg);
        }
        this.decreaseIncomingMessageSize(msg);
    }

    protected void trackMessage(Message<?> msg) {
        if (msg != null) {
            this.trackMessage(msg.getMessageId(), msg.getRedeliveryCount());
        }
    }

    protected void trackMessage(MessageId messageId) {
        this.trackMessage(messageId, 0);
    }

    protected void trackMessage(MessageId messageId, int redeliveryCount) {
        if (this.conf.getAckTimeoutMillis() > 0L && messageId instanceof MessageIdImpl) {
            MessageIdAdv id = MessageIdAdvUtils.discardBatch(messageId);
            if (this.hasParentConsumer) {
                this.unAckedMessageTracker.remove(id);
            } else {
                this.trackUnAckedMsgIfNoListener(id, redeliveryCount);
            }
        }
    }

    void increaseAvailablePermits(MessageImpl<?> msg) {
        ClientCnx currentCnx = this.cnx();
        ClientCnx msgCnx = msg.getCnx();
        if (msgCnx == currentCnx) {
            this.increaseAvailablePermits(currentCnx);
        }
    }

    void increaseAvailablePermits(ClientCnx currentCnx) {
        this.increaseAvailablePermits(currentCnx, 1);
    }

    protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
        int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
        while (available >= this.getCurrentReceiverQueueSize() / 2 && !this.paused) {
            if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
                this.sendFlowPermitsToBroker(currentCnx, available);
                break;
            }
            available = AVAILABLE_PERMITS_UPDATER.get(this);
        }
    }

    public void increaseAvailablePermits(int delta) {
        this.increaseAvailablePermits(this.cnx(), delta);
    }

    @Override
    protected void setCurrentReceiverQueueSize(int newSize) {
        Preconditions.checkArgument(newSize > 0, "receiver queue size should larger than 0");
        int oldSize = CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.getAndSet(this, newSize);
        int delta = newSize - oldSize;
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] update currentReceiverQueueSize from {} to {}, increaseAvailablePermits by {}", new Object[]{this.topic, this.subscription, oldSize, newSize, delta});
        }
        this.increaseAvailablePermits(delta);
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        if (this.paused) {
            this.paused = false;
            this.increaseAvailablePermits(this.cnx(), 0);
        }
    }

    public long getLastDisconnectedTimestamp() {
        return this.connectionHandler.lastConnectionClosedTimestamp;
    }

    private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount, MessageMetadata msgMetadata, ByteBuf payload, ClientCnx currentCnx) {
        if (msgMetadata.getEncryptionKeysCount() == 0) {
            return payload.retain();
        }
        if (this.conf.getCryptoKeyReader() == null) {
            switch (this.conf.getCryptoFailureAction()) {
                case CONSUME: {
                    log.debug("[{}][{}][{}] CryptoKeyReader interface is not implemented. Consuming encrypted message.", new Object[]{this.topic, this.subscription, this.consumerName});
                    return payload.retain();
                }
                case DISCARD: {
                    log.warn("[{}][{}][{}] Skipping decryption since CryptoKeyReader interface is not implemented and config is set to discard", new Object[]{this.topic, this.subscription, this.consumerName});
                    this.discardMessage(messageId, currentCnx, CommandAck.ValidationError.DecryptionError);
                    return null;
                }
                case FAIL: {
                    MessageIdImpl m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), this.partitionIndex);
                    log.error("[{}][{}][{}][{}] Message delivery failed since CryptoKeyReader interface is not implemented to consume encrypted message", new Object[]{this.topic, this.subscription, this.consumerName, m});
                    this.unAckedMessageTracker.add(m, redeliveryCount);
                    return null;
                }
            }
        }
        int maxDecryptedSize = this.msgCrypto.getMaxOutputSize(payload.readableBytes());
        ByteBuf decryptedData = PulsarByteBufAllocator.DEFAULT.buffer(maxDecryptedSize);
        ByteBuffer nioDecryptedData = decryptedData.nioBuffer(0, maxDecryptedSize);
        if (this.msgCrypto.decrypt(() -> msgMetadata, payload.nioBuffer(), nioDecryptedData, this.conf.getCryptoKeyReader())) {
            decryptedData.writerIndex(nioDecryptedData.limit());
            return decryptedData;
        }
        decryptedData.release();
        switch (this.conf.getCryptoFailureAction()) {
            case CONSUME: {
                log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since config is set to consume.", new Object[]{this.topic, this.subscription, this.consumerName, messageId});
                return payload.retain();
            }
            case DISCARD: {
                log.warn("[{}][{}][{}][{}] Discarding message since decryption failed and config is set to discard", new Object[]{this.topic, this.subscription, this.consumerName, messageId});
                this.discardMessage(messageId, currentCnx, CommandAck.ValidationError.DecryptionError);
                return null;
            }
            case FAIL: {
                MessageIdImpl m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), this.partitionIndex);
                log.error("[{}][{}][{}][{}] Message delivery failed since unable to decrypt incoming message", new Object[]{this.topic, this.subscription, this.consumerName, m});
                this.unAckedMessageTracker.add(m, redeliveryCount);
                return null;
            }
        }
        return null;
    }

    private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload, ClientCnx currentCnx, boolean checkMaxMessageSize) {
        CompressionType compressionType = msgMetadata.getCompression();
        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
        int uncompressedSize = msgMetadata.getUncompressedSize();
        int payloadSize = payload.readableBytes();
        if (checkMaxMessageSize && payloadSize > ClientCnx.getMaxMessageSize()) {
            log.error("[{}][{}] Got corrupted payload message size {} at {}", new Object[]{this.topic, this.subscription, payloadSize, messageId});
            this.discardCorruptedMessage(messageId, currentCnx, CommandAck.ValidationError.UncompressedSizeCorruption);
            return null;
        }
        try {
            ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
            return uncompressedPayload;
        }
        catch (IOException e) {
            log.error("[{}][{}] Failed to decompress message with {} at {}: {}", new Object[]{this.topic, this.subscription, compressionType, messageId, e.getMessage(), e});
            this.discardCorruptedMessage(messageId, currentCnx, CommandAck.ValidationError.DecompressionError);
            return null;
        }
    }

    private boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId) {
        int computedChecksum;
        int checksum;
        if (Commands.hasChecksum(headersAndPayload) && (checksum = Commands.readChecksum(headersAndPayload)) != (computedChecksum = Crc32cIntChecksum.computeChecksum(headersAndPayload))) {
            log.error("[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}", new Object[]{this.topic, this.subscription, messageId.getLedgerId(), messageId.getEntryId(), Long.toHexString(checksum), Integer.toHexString(computedChecksum)});
            return false;
        }
        return true;
    }

    private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentCnx, CommandAck.ValidationError validationError) {
        log.error("[{}][{}] Discarding corrupted message at {}:{}", new Object[]{this.topic, this.subscription, messageId.getLedgerId(), messageId.getEntryId()});
        ByteBuf cmd = Commands.newAck(this.consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, CommandAck.AckType.Individual, validationError, Collections.emptyMap(), -1L);
        currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
        this.increaseAvailablePermits(currentCnx);
        this.stats.incrementNumReceiveFailed();
    }

    private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx, CommandAck.ValidationError validationError) {
        log.error("[{}][{}] Discarding corrupted message at {}:{}", new Object[]{this.topic, this.subscription, messageId.getLedgerId(), messageId.getEntryId()});
        this.discardMessage(messageId, currentCnx, validationError);
    }

    private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, CommandAck.ValidationError validationError) {
        ByteBuf cmd = Commands.newAck(this.consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, CommandAck.AckType.Individual, validationError, Collections.emptyMap(), -1L);
        currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
        this.increaseAvailablePermits(currentCnx);
        this.stats.incrementNumReceiveFailed();
    }

    @Override
    String getHandlerName() {
        return this.subscription;
    }

    public boolean isConnected() {
        return this.getClientCnx() != null && this.getState() == HandlerState.State.Ready;
    }

    public boolean isConnected(ClientCnx cnx) {
        return cnx != null && this.getState() == HandlerState.State.Ready;
    }

    int getPartitionIndex() {
        return this.partitionIndex;
    }

    @Override
    public int getAvailablePermits() {
        return AVAILABLE_PERMITS_UPDATER.get(this);
    }

    @Override
    public int numMessagesInQueue() {
        return this.incomingMessages.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void redeliverUnacknowledgedMessages() {
        ConsumerImpl consumerImpl = this;
        synchronized (consumerImpl) {
            int currentSize;
            ClientCnx cnx = this.cnx();
            if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
                if (this.getState() == HandlerState.State.Connecting) {
                    log.warn("[{}] Client Connection needs to be established for redelivery of unacknowledged messages", (Object)this);
                } else {
                    log.warn("[{}] Reconnecting the client to redeliver the messages.", (Object)this);
                    cnx.ctx().close();
                }
                return;
            }
            this.incomingQueueLock.lock();
            try {
                if (this.conf.getSubscriptionType() == SubscriptionType.Failover || this.conf.getSubscriptionType() == SubscriptionType.Exclusive) {
                    CONSUMER_EPOCH.incrementAndGet(this);
                }
                currentSize = this.incomingMessages.size();
                this.clearIncomingMessages();
                this.unAckedMessageTracker.clear();
            }
            finally {
                this.incomingQueueLock.unlock();
            }
            if (cnx != null && this.isConnected(cnx)) {
                cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(this.consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise());
                if (currentSize > 0) {
                    this.increaseAvailablePermits(cnx, currentSize);
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", new Object[]{this.subscription, this.topic, this.consumerName, currentSize});
                }
            } else {
                log.warn("[{}] Send redeliver messages command but the client is reconnect or close, so don't need to send redeliver command to broker", (Object)this);
            }
        }
    }

    @Override
    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
        if (messageIds.isEmpty()) {
            return;
        }
        if (this.conf.getSubscriptionType() != SubscriptionType.Shared && this.conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
            this.redeliverUnacknowledgedMessages();
            return;
        }
        ClientCnx cnx = this.cnx();
        if (this.isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getValue()) {
            int messagesFromQueue = this.removeExpiredMessagesFromQueue(messageIds);
            Iterables.partition(messageIds, 1000).forEach(ids -> this.getRedeliveryMessageIdData((List<MessageId>)ids).thenAccept(messageIdData -> {
                if (!messageIdData.isEmpty()) {
                    ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(this.consumerId, messageIdData);
                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                }
            }));
            if (messagesFromQueue > 0) {
                this.increaseAvailablePermits(cnx, messagesFromQueue);
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", new Object[]{this.subscription, this.topic, this.consumerName, messagesFromQueue});
            }
            return;
        }
        if (cnx == null || this.getState() == HandlerState.State.Connecting) {
            log.warn("[{}] Client Connection needs to be established for redelivery of unacknowledged messages", (Object)this);
        } else {
            log.warn("[{}] Reconnecting the client to redeliver the messages.", (Object)this);
            cnx.ctx().close();
        }
    }

    @Override
    protected void updateAutoScaleReceiverQueueHint() {
        boolean prev = this.scaleReceiverQueueHint.getAndSet(this.getAvailablePermits() + this.incomingMessages.size() >= this.getCurrentReceiverQueueSize());
        if (log.isDebugEnabled() && prev != this.scaleReceiverQueueHint.get()) {
            log.debug("updateAutoScaleReceiverQueueHint {} -> {}", (Object)prev, (Object)this.scaleReceiverQueueHint.get());
        }
    }

    @Override
    protected void completeOpBatchReceive(ConsumerBase.OpBatchReceive<T> op) {
        this.notifyPendingBatchReceivedCallBack(op.future);
    }

    private CompletableFuture<List<MessageIdData>> getRedeliveryMessageIdData(List<MessageId> messageIds) {
        if (messageIds == null || messageIds.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        List futures = messageIds.stream().map(originalMessageId -> {
            MessageIdAdv messageId = (MessageIdAdv)originalMessageId;
            CompletableFuture<Boolean> future = this.processPossibleToDLQ(messageId);
            return future.thenApply(sendToDLQ -> {
                if (!sendToDLQ.booleanValue()) {
                    return new MessageIdData().setPartition(messageId.getPartitionIndex()).setLedgerId(messageId.getLedgerId()).setEntryId(messageId.getEntryId());
                }
                return null;
            });
        }).collect(Collectors.toList());
        return FutureUtil.waitForAll(futures).thenApply(v -> futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));
    }

    private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId) {
        List<MessageImpl<T>> deadLetterMessages = null;
        if (this.possibleSendToDeadLetterTopicMessages != null) {
            deadLetterMessages = this.possibleSendToDeadLetterTopicMessages.get(MessageIdAdvUtils.discardBatch(messageId));
        }
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        if (deadLetterMessages != null) {
            this.initDeadLetterProducerIfNeeded();
            List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
            ((CompletableFuture)this.deadLetterProducer.thenAcceptAsync(producerDLQ -> {
                for (MessageImpl message : finalDeadLetterMessages) {
                    String originMessageIdStr = message.getMessageId().toString();
                    String originTopicNameStr = this.getOriginTopicNameStr(message);
                    TypedMessageBuilder typedMessageBuilderNew = producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())).value((Object)message.getData()).properties(this.getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
                    if (message.hasKey()) {
                        typedMessageBuilderNew.key(message.getKey());
                    }
                    ((CompletableFuture)typedMessageBuilderNew.sendAsync().thenAccept(messageIdInDLQ -> {
                        this.possibleSendToDeadLetterTopicMessages.remove(messageId);
                        this.acknowledgeAsync(messageId).whenComplete((v, ex) -> {
                            if (ex != null) {
                                log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original topic but send to the DLQ successfully.", new Object[]{this.topicName, this.subscription, this.consumerName, messageId, ex});
                                result.complete(false);
                            } else {
                                result.complete(true);
                            }
                        });
                    })).exceptionally(ex -> {
                        if (ex instanceof PulsarClientException.ProducerQueueIsFullError) {
                            log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}: {}", new Object[]{this.topicName, this.subscription, this.consumerName, this.deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage()});
                        } else {
                            log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", new Object[]{this.topicName, this.subscription, this.consumerName, this.deadLetterPolicy.getDeadLetterTopic(), messageId, ex});
                        }
                        result.complete(false);
                        return null;
                    });
                }
            }, (Executor)this.internalPinnedExecutor)).exceptionally(ex -> {
                log.error("Dead letter producer exception with topic: {}", (Object)this.deadLetterPolicy.getDeadLetterTopic(), ex);
                this.deadLetterProducer = null;
                result.complete(false);
                return null;
            });
        } else {
            result.complete(false);
        }
        return result;
    }

    private void initDeadLetterProducerIfNeeded() {
        if (this.deadLetterProducer == null) {
            this.createProducerLock.writeLock().lock();
            try {
                if (this.deadLetterProducer == null) {
                    this.deadLetterProducer = ((ProducerBuilderImpl)this.client.newProducer(Schema.AUTO_PRODUCE_BYTES((Schema)this.schema))).initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()).topic(this.deadLetterPolicy.getDeadLetterTopic()).producerName(String.format("%s-%s-%s-DLQ", this.topicName, this.subscription, this.consumerName)).blockIfQueueFull(false).enableBatching(false).enableChunking(true).createAsync();
                    this.deadLetterProducer.thenAccept(dlqProducer -> this.stats.setDeadLetterProducerStats(dlqProducer.getStats()));
                }
            }
            finally {
                this.createProducerLock.writeLock().unlock();
            }
        }
    }

    public void seek(MessageId messageId) throws PulsarClientException {
        try {
            this.seekAsync(messageId).get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void seek(long timestamp) throws PulsarClientException {
        try {
            this.seekAsync(timestamp).get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void seek(Function<String, Object> function) throws PulsarClientException {
        try {
            this.seekAsync(function).get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
        if (function == null) {
            return FutureUtil.failedFuture(new PulsarClientException("Function must be set"));
        }
        Object seekPosition = function.apply(this.topic);
        if (seekPosition == null) {
            return CompletableFuture.completedFuture(null);
        }
        if (seekPosition instanceof MessageId) {
            return this.seekAsync((MessageId)seekPosition);
        }
        if (seekPosition.getClass().getTypeName().equals(Long.class.getTypeName())) {
            return this.seekAsync((Long)seekPosition);
        }
        return FutureUtil.failedFuture(new PulsarClientException("Only support seek by messageId or timestamp"));
    }

    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
        AtomicLong opTimeoutMs = new AtomicLong(this.client.getConfiguration().getOperationTimeoutMs());
        Backoff backoff = new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMax(opTimeoutMs.get() * 2L, TimeUnit.MILLISECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create();
        CompletableFuture<Void> seekFuture = new CompletableFuture<Void>();
        this.seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
        return seekFuture;
    }

    private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy, Backoff backoff, AtomicLong remainingTime, CompletableFuture<Void> seekFuture) {
        ClientCnx cnx = this.cnx();
        if (this.isConnected() && cnx != null) {
            if (!this.duringSeek.compareAndSet(false, true)) {
                String message = String.format("[%s][%s] attempting to seek operation that is already in progress (seek by %s)", this.topic, this.subscription, seekBy);
                log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", new Object[]{this.topic, this.subscription, seekBy});
                seekFuture.completeExceptionally(new IllegalStateException(message));
                return;
            }
            MessageIdAdv originSeekMessageId = this.seekMessageId;
            this.seekMessageId = (MessageIdAdv)seekId;
            log.info("[{}][{}] Seeking subscription to {}", new Object[]{this.topic, this.subscription, seekBy});
            ((CompletableFuture)cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
                log.info("[{}][{}] Successfully reset subscription to {}", new Object[]{this.topic, this.subscription, seekBy});
                this.acknowledgmentsGroupingTracker.flushAndClean();
                this.lastDequeuedMessageId = MessageId.earliest;
                this.clearIncomingMessages();
                seekFuture.complete(null);
            })).exceptionally(e -> {
                this.seekMessageId = originSeekMessageId;
                this.duringSeek.set(false);
                log.error("[{}][{}] Failed to reset subscription: {}", new Object[]{this.topic, this.subscription, e.getCause().getMessage()});
                seekFuture.completeExceptionally(PulsarClientException.wrap((Throwable)e.getCause(), (String)String.format("Failed to seek the subscription %s of the topic %s to %s", this.subscription, this.topicName.toString(), seekBy)));
                return null;
            });
        } else {
            long nextDelay = Math.min(backoff.next(), remainingTime.get());
            if (nextDelay <= 0L) {
                seekFuture.completeExceptionally((Throwable)new PulsarClientException.TimeoutException(String.format("The subscription %s of the topic %s could not seek withing configured timeout", this.subscription, this.topicName.toString())));
                return;
            }
            ((ScheduledExecutorService)this.client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
                log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms", new Object[]{this.topic, this.getHandlerName(), nextDelay});
                remainingTime.addAndGet(-nextDelay);
                this.seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture);
            }, nextDelay, TimeUnit.MILLISECONDS);
        }
    }

    public CompletableFuture<Void> seekAsync(long timestamp) {
        String seekBy = String.format("the timestamp %d", timestamp);
        long requestId = this.client.newRequestId();
        return this.seekAsyncInternal(requestId, Commands.newSeek(this.consumerId, requestId, timestamp), MessageId.earliest, seekBy);
    }

    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        ByteBuf seek;
        String seekBy = String.format("the message %s", messageId.toString());
        long requestId = this.client.newRequestId();
        MessageIdAdv msgId = (MessageIdAdv)messageId;
        MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId();
        if (msgId.getFirstChunkMessageId() != null) {
            seek = Commands.newSeek(this.consumerId, requestId, firstChunkMsgId.getLedgerId(), firstChunkMsgId.getEntryId(), new long[0]);
        } else {
            long[] ackSetArr;
            if (MessageIdAdvUtils.isBatch(msgId)) {
                BitSetRecyclable ackSet = BitSetRecyclable.create();
                ackSet.set(0, msgId.getBatchSize());
                ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
                ackSetArr = ackSet.toLongArray();
                ackSet.recycle();
            } else {
                ackSetArr = new long[]{};
            }
            seek = Commands.newSeek(this.consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
        }
        return this.seekAsyncInternal(requestId, seek, messageId, seekBy);
    }

    public boolean hasMessageAvailable() throws PulsarClientException {
        try {
            return this.hasMessageAvailableAsync().get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
        CompletableFuture<Boolean> booleanFuture = new CompletableFuture<Boolean>();
        if (this.incomingMessages != null && !this.incomingMessages.isEmpty()) {
            return CompletableFuture.completedFuture(true);
        }
        if (this.lastDequeuedMessageId == MessageId.earliest) {
            if (MessageId.latest.equals(this.startMessageId)) {
                CompletionStage<GetLastMessageIdResponse> future = this.internalGetLastMessageIdAsync();
                if (this.resetIncludeHead) {
                    future = future.thenCompose(lastMessageIdResponse -> this.seekAsync(lastMessageIdResponse.lastMessageId).thenApply(ignore -> lastMessageIdResponse));
                }
                ((CompletableFuture)future.thenAccept(response -> {
                    MessageIdAdv lastMessageId = (MessageIdAdv)response.lastMessageId;
                    MessageIdAdv markDeletePosition = (MessageIdAdv)response.markDeletePosition;
                    if (markDeletePosition != null && (markDeletePosition.getEntryId() >= 0L || markDeletePosition.getLedgerId() <= lastMessageId.getLedgerId())) {
                        int result = ComparisonChain.start().compare(markDeletePosition.getLedgerId(), lastMessageId.getLedgerId()).compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId()).result();
                        if (lastMessageId.getEntryId() < 0L) {
                            this.completehasMessageAvailableWithValue(booleanFuture, false);
                        } else {
                            this.completehasMessageAvailableWithValue(booleanFuture, this.resetIncludeHead ? result <= 0 : result < 0);
                        }
                    } else if (lastMessageId == null || lastMessageId.getEntryId() < 0L) {
                        this.completehasMessageAvailableWithValue(booleanFuture, false);
                    } else {
                        this.completehasMessageAvailableWithValue(booleanFuture, this.resetIncludeHead);
                    }
                })).exceptionally(ex -> {
                    log.error("[{}][{}] Failed getLastMessageId command", new Object[]{this.topic, this.subscription, ex});
                    booleanFuture.completeExceptionally(ex.getCause());
                    return null;
                });
                return booleanFuture;
            }
            if (this.hasMoreMessages(this.lastMessageIdInBroker, this.startMessageId, this.resetIncludeHead)) {
                this.completehasMessageAvailableWithValue(booleanFuture, true);
                return booleanFuture;
            }
            ((CompletableFuture)this.getLastMessageIdAsync().thenAccept(messageId -> {
                this.lastMessageIdInBroker = messageId;
                this.completehasMessageAvailableWithValue(booleanFuture, this.hasMoreMessages(this.lastMessageIdInBroker, this.startMessageId, this.resetIncludeHead));
            })).exceptionally(e -> {
                log.error("[{}][{}] Failed getLastMessageId command", (Object)this.topic, (Object)this.subscription);
                booleanFuture.completeExceptionally(e.getCause());
                return null;
            });
        } else {
            if (this.hasMoreMessages(this.lastMessageIdInBroker, this.lastDequeuedMessageId, false)) {
                this.completehasMessageAvailableWithValue(booleanFuture, true);
                return booleanFuture;
            }
            ((CompletableFuture)this.getLastMessageIdAsync().thenAccept(messageId -> {
                this.lastMessageIdInBroker = messageId;
                this.completehasMessageAvailableWithValue(booleanFuture, this.hasMoreMessages(this.lastMessageIdInBroker, this.lastDequeuedMessageId, false));
            })).exceptionally(e -> {
                log.error("[{}][{}] Failed getLastMessageId command", (Object)this.topic, (Object)this.subscription);
                booleanFuture.completeExceptionally(e.getCause());
                return null;
            });
        }
        return booleanFuture;
    }

    private void completehasMessageAvailableWithValue(CompletableFuture<Boolean> future, boolean value) {
        this.internalPinnedExecutor.execute(() -> future.complete(value));
    }

    private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messageId, boolean inclusive) {
        if (inclusive && lastMessageIdInBroker.compareTo((Object)messageId) >= 0 && ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1L) {
            return true;
        }
        return !inclusive && lastMessageIdInBroker.compareTo((Object)messageId) > 0 && ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1L;
    }

    @Override
    @Deprecated
    public CompletableFuture<MessageId> getLastMessageIdAsync() {
        return this.internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId);
    }

    public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
        return this.getLastMessageIdAsync().thenApply(msgId -> Collections.singletonList(new TopicMessageIdImpl(this.topic, (MessageIdAdv)msgId)));
    }

    public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException(String.format("The consumer %s was already closed when the subscription %s of the topic %s getting the last message id", this.consumerName, this.subscription, this.topicName.toString())));
        }
        AtomicLong opTimeoutMs = new AtomicLong(this.client.getConfiguration().getOperationTimeoutMs());
        Backoff backoff = new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMax(opTimeoutMs.get() * 2L, TimeUnit.MILLISECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create();
        CompletableFuture<GetLastMessageIdResponse> getLastMessageIdFuture = new CompletableFuture<GetLastMessageIdResponse>();
        this.internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture);
        return getLastMessageIdFuture;
    }

    private void internalGetLastMessageIdAsync(Backoff backoff, AtomicLong remainingTime, CompletableFuture<GetLastMessageIdResponse> future) {
        ClientCnx cnx = this.cnx();
        if (this.isConnected() && cnx != null) {
            if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) {
                future.completeExceptionally((Throwable)new PulsarClientException.NotSupportedException(String.format("The command `GetLastMessageId` is not supported for the protocol version %d. The consumer is %s, topic %s, subscription %s", cnx.getRemoteEndpointProtocolVersion(), this.consumerName, this.topicName.toString(), this.subscription)));
                return;
            }
            long requestId = this.client.newRequestId();
            ByteBuf getLastIdCmd = Commands.newGetLastMessageId(this.consumerId, requestId);
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Get topic last message Id", (Object)this.topic, (Object)this.subscription);
            }
            ((CompletableFuture)cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(cmd -> {
                MessageIdData lastMessageId = cmd.getLastMessageId();
                MessageIdImpl markDeletePosition = null;
                if (cmd.hasConsumerMarkDeletePosition()) {
                    markDeletePosition = new MessageIdImpl(cmd.getConsumerMarkDeletePosition().getLedgerId(), cmd.getConsumerMarkDeletePosition().getEntryId(), -1);
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Successfully getLastMessageId {}:{}", new Object[]{this.topic, this.subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()});
                }
                MessageIdImpl lastMsgId = lastMessageId.getBatchIndex() <= 0 ? new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), lastMessageId.getPartition()) : new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), lastMessageId.getPartition(), lastMessageId.getBatchIndex());
                future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition));
            })).exceptionally(e -> {
                log.error("[{}][{}] Failed getLastMessageId command", (Object)this.topic, (Object)this.subscription);
                future.completeExceptionally(PulsarClientException.wrap((Throwable)e.getCause(), (String)String.format("The subscription %s of the topic %s gets the last message id was failed", this.subscription, this.topicName.toString())));
                return null;
            });
        } else {
            long nextDelay = Math.min(backoff.next(), remainingTime.get());
            if (nextDelay <= 0L) {
                future.completeExceptionally((Throwable)new PulsarClientException.TimeoutException(String.format("The subscription %s of the topic %s could not get the last message id withing configured timeout", this.subscription, this.topicName.toString())));
                return;
            }
            log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", new Object[]{this.topic, this.getHandlerName(), nextDelay});
            ((ScheduledExecutorService)this.client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
                remainingTime.addAndGet(-nextDelay);
                this.internalGetLastMessageIdAsync(backoff, remainingTime, future);
            }, nextDelay, TimeUnit.MILLISECONDS);
        }
    }

    private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
        return msgMetadata.getEncryptionKeysCount() > 0 && this.conf.getCryptoKeyReader() == null && this.conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME;
    }

    private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgMetadata) {
        EncryptionContext encryptionCtx = null;
        if (msgMetadata.getEncryptionKeysCount() > 0) {
            encryptionCtx = new EncryptionContext();
            Map<String, EncryptionContext.EncryptionKey> keys = msgMetadata.getEncryptionKeysList().stream().collect(Collectors.toMap(EncryptionKeys::getKey, e -> new EncryptionContext.EncryptionKey(e.getValue(), e.getMetadatasList().stream().collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)))));
            byte[] encParam = msgMetadata.getEncryptionParam();
            Optional<Object> batchSize = Optional.ofNullable(msgMetadata.hasNumMessagesInBatch() ? Integer.valueOf(msgMetadata.getNumMessagesInBatch()) : null);
            encryptionCtx.setKeys(keys);
            encryptionCtx.setParam(encParam);
            if (msgMetadata.hasEncryptionAlgo()) {
                encryptionCtx.setAlgorithm(msgMetadata.getEncryptionAlgo());
            }
            encryptionCtx.setCompressionType(CompressionCodecProvider.convertFromWireProtocol(msgMetadata.getCompression()));
            encryptionCtx.setUncompressedMessageSize(msgMetadata.getUncompressedSize());
            encryptionCtx.setBatchSize(batchSize);
        }
        return Optional.ofNullable(encryptionCtx);
    }

    private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
        int messagesFromQueue = 0;
        Message peek = (Message)this.incomingMessages.peek();
        if (peek != null) {
            MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId());
            if (!messageIds.contains(messageId)) {
                return 0;
            }
            Message message = (Message)this.incomingMessages.poll();
            while (message != null) {
                this.decreaseIncomingMessageSize(message);
                ++messagesFromQueue;
                MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId());
                if (!messageIds.contains(id)) {
                    messageIds.add(id);
                    break;
                }
                message.release();
                message = (Message)this.incomingMessages.poll();
            }
        }
        return messagesFromQueue;
    }

    public ConsumerStatsRecorder getStats() {
        return this.stats;
    }

    void setTerminated() {
        log.info("[{}] [{}] [{}] Consumer has reached the end of topic", new Object[]{this.subscription, this.topic, this.consumerName});
        this.hasReachedEndOfTopic = true;
        if (this.listener != null) {
            this.listener.reachedEndOfTopic((Consumer)this);
        }
    }

    public boolean hasReachedEndOfTopic() {
        return this.hasReachedEndOfTopic;
    }

    public int hashCode() {
        return Objects.hash(this.topic, this.subscription, this.consumerName);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof ConsumerImpl)) {
            return false;
        }
        ConsumerImpl consumer = (ConsumerImpl)o;
        return this.consumerId == consumer.consumerId;
    }

    ClientCnx cnx() {
        return this.connectionHandler.cnx();
    }

    void resetBackoff() {
        this.connectionHandler.resetBackoff();
    }

    void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDelayMs, Optional<URI> hostUrl) {
        this.connectionHandler.connectionClosed(cnx, initialConnectionDelayMs, hostUrl);
    }

    public ClientCnx getClientCnx() {
        return this.connectionHandler.cnx();
    }

    void setClientCnx(ClientCnx clientCnx) {
        ClientCnx previousClientCnx;
        if (clientCnx != null) {
            this.connectionHandler.setClientCnx(clientCnx);
            clientCnx.registerConsumer(this.consumerId, this);
            if (this.conf.isAckReceiptEnabled() && !Commands.peerSupportsAckReceipt(clientCnx.getRemoteEndpointProtocolVersion())) {
                log.warn("Server don't support ack for receipt! ProtoVersion >=17 support! nowVersion : {}", (Object)clientCnx.getRemoteEndpointProtocolVersion());
            }
        }
        if ((previousClientCnx = this.clientCnxUsedForConsumerRegistration.getAndSet(clientCnx)) != null && previousClientCnx != clientCnx) {
            previousClientCnx.removeConsumer(this.consumerId);
        }
    }

    void deregisterFromClientCnx() {
        this.setClientCnx(null);
    }

    void grabCnx() {
        this.connectionHandler.grabCnx();
    }

    @Deprecated
    public String getTopicNameWithoutPartition() {
        return this.topicNameWithoutPartition;
    }

    private void removeOldestPendingChunkedMessage() {
        ChunkedMessageCtx chunkedMsgCtx = null;
        String firstPendingMsgUuid = null;
        while (chunkedMsgCtx == null && !this.pendingChunkedMessageUuidQueue.isEmpty()) {
            firstPendingMsgUuid = (String)this.pendingChunkedMessageUuidQueue.poll();
            chunkedMsgCtx = StringUtils.isNotBlank(firstPendingMsgUuid) ? this.chunkedMessagesMap.get(firstPendingMsgUuid) : null;
        }
        this.removeChunkMessage(firstPendingMsgUuid, chunkedMsgCtx, this.autoAckOldestChunkedMessageOnQueueFull);
    }

    protected void removeExpireIncompleteChunkedMessages() {
        String messageUUID;
        if (this.expireTimeOfIncompleteChunkedMessageMillis <= 0L) {
            return;
        }
        ChunkedMessageCtx chunkedMsgCtx = null;
        while ((messageUUID = (String)this.pendingChunkedMessageUuidQueue.peek()) != null) {
            ChunkedMessageCtx chunkedMessageCtx = chunkedMsgCtx = StringUtils.isNotBlank(messageUUID) ? this.chunkedMessagesMap.get(messageUUID) : null;
            if (chunkedMsgCtx != null && System.currentTimeMillis() > chunkedMsgCtx.receivedTime + this.expireTimeOfIncompleteChunkedMessageMillis) {
                this.pendingChunkedMessageUuidQueue.remove(messageUUID);
                this.removeChunkMessage(messageUUID, chunkedMsgCtx, true);
                continue;
            }
            return;
        }
    }

    private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx, boolean autoAck) {
        if (chunkedMsgCtx == null) {
            return;
        }
        this.chunkedMessagesMap.remove(msgUUID);
        if (chunkedMsgCtx.chunkedMessageIds != null) {
            for (MessageIdImpl msgId : chunkedMsgCtx.chunkedMessageIds) {
                if (msgId == null) continue;
                if (autoAck) {
                    log.info("Removing chunk message-id {}", (Object)msgId);
                    this.doAcknowledge(msgId, CommandAck.AckType.Individual, Collections.emptyMap(), null);
                    continue;
                }
                this.trackMessage(msgId);
            }
        }
        if (chunkedMsgCtx.chunkedMsgBuffer != null) {
            chunkedMsgCtx.chunkedMsgBuffer.release();
        }
        chunkedMsgCtx.recycle();
        --this.pendingChunkedMessageCount;
    }

    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId messageId, CommandAck.AckType ackType, CommandAck.ValidationError validationError, Map<String, Long> properties, TxnID txnID) {
        List<Object> cmdList;
        long requestId = this.client.newRequestId();
        MessageIdAdv messageIdAdv = (MessageIdAdv)messageId;
        long ledgerId = messageIdAdv.getLedgerId();
        long entryId = messageIdAdv.getEntryId();
        if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
            BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
            bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
            if (ackType == CommandAck.AckType.Cumulative) {
                MessageIdAdvUtils.acknowledge(messageIdAdv, false);
                bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1);
            } else {
                bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
            }
            cmdList = Collections.singletonList(Commands.newAck(this.consumerId, ledgerId, entryId, bitSetRecyclable, ackType, validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, messageIdAdv.getBatchSize()));
            bitSetRecyclable.recycle();
        } else {
            MessageIdImpl[] chunkMsgIds = (MessageIdImpl[])this.unAckedChunkedMessageIdSequenceMap.remove(messageIdAdv);
            if (chunkMsgIds == null || ackType == CommandAck.AckType.Cumulative) {
                cmdList = Collections.singletonList(Commands.newAck(this.consumerId, ledgerId, entryId, null, ackType, validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId));
            } else if (Commands.peerSupportsMultiMessageAcknowledgment(this.getClientCnx().getRemoteEndpointProtocolVersion())) {
                MessageIdImpl[] entriesToAck = new ArrayList(chunkMsgIds.length);
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    if (cMsgId == null || chunkMsgIds.length <= 1) continue;
                    entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                }
                cmdList = Collections.singletonList(this.newMultiTransactionMessageAck(this.consumerId, txnID, (List<Triple<Long, Long, ConcurrentBitSetRecyclable>>)entriesToAck, requestId));
            } else {
                cmdList = new ArrayList();
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    cmdList.add(Commands.newAck(this.consumerId, cMsgId.ledgerId, cMsgId.entryId, null, ackType, validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId));
                }
            }
        }
        if (ackType == CommandAck.AckType.Cumulative) {
            this.unAckedMessageTracker.removeMessagesTill(messageId);
        } else {
            this.unAckedMessageTracker.remove(messageId);
        }
        ClientCnx cnx = this.cnx();
        if (cnx == null) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.ConnectException("Failed to ack message [" + messageId + "] for transaction [" + txnID + "] due to consumer connect fail, consumer state: " + (Object)((Object)this.getState())));
        }
        LinkedList completableFutures = new LinkedList();
        cmdList.forEach(cmd -> completableFutures.add(cnx.newAckForReceipt((ByteBuf)cmd, requestId)));
        return FutureUtil.waitForAll(completableFutures);
    }

    private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries, long requestID) {
        BaseCommand cmd = ConsumerImpl.newMultiMessageAckCommon(entries);
        cmd.getAck().setConsumerId(consumerId).setAckType(CommandAck.AckType.Individual).setTxnidLeastBits(txnID.getLeastSigBits()).setTxnidMostBits(txnID.getMostSigBits()).setRequestId(requestID);
        return Commands.serializeWithSize(cmd);
    }

    private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
        BaseCommand cmd = LOCAL_BASE_COMMAND.get().clear().setType(BaseCommand.Type.ACK);
        CommandAck ack = cmd.setAck();
        int entriesCount = entries.size();
        for (int i = 0; i < entriesCount; ++i) {
            long ledgerId = entries.get(i).getLeft();
            long entryId = entries.get(i).getMiddle();
            ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
            MessageIdData msgId = ack.addMessageId().setLedgerId(ledgerId).setEntryId(entryId);
            if (bitSet == null) continue;
            long[] ackSet = bitSet.toLongArray();
            for (int j = 0; j < ackSet.length; ++j) {
                msgId.addAckSet(ackSet[j]);
            }
            bitSet.recycle();
        }
        return cmd;
    }

    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, CommandAck.AckType ackType, Map<String, Long> properties, TxnID txnID) {
        long requestId = this.client.newRequestId();
        LinkedList<MessageIdData> messageIdDataList = new LinkedList<MessageIdData>();
        for (MessageId messageId : messageIds) {
            MessageIdAdv messageIdAdv = (MessageIdAdv)messageId;
            MessageIdData messageIdData = new MessageIdData();
            messageIdData.setLedgerId(messageIdAdv.getLedgerId());
            messageIdData.setEntryId(messageIdAdv.getEntryId());
            if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
                BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
                bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
                if (ackType == CommandAck.AckType.Cumulative) {
                    MessageIdAdvUtils.acknowledge(messageIdAdv, false);
                    bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1);
                } else {
                    bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
                }
                for (long x : bitSetRecyclable.toLongArray()) {
                    messageIdData.addAckSet(x);
                }
                bitSetRecyclable.recycle();
            }
            messageIdDataList.add(messageIdData);
            if (ackType == CommandAck.AckType.Cumulative) {
                this.unAckedMessageTracker.removeMessagesTill(messageId);
                continue;
            }
            this.unAckedMessageTracker.remove(messageId);
        }
        ByteBuf cmd = Commands.newAck(this.consumerId, messageIdDataList, ackType, null, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
        return this.cnx().newAckForReceipt(cmd, requestId);
    }

    public Map<MessageIdAdv, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopicMessages() {
        return this.possibleSendToDeadLetterTopicMessages;
    }

    boolean isAckReceiptEnabled() {
        ClientCnx cnx = this.getClientCnx();
        return this.conf.isAckReceiptEnabled() && cnx != null && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
    }

    int getPriorityLevel() {
        return this.priorityLevel;
    }

    static class ChunkedMessageCtx {
        protected int totalChunks = -1;
        protected ByteBuf chunkedMsgBuffer;
        protected int lastChunkedMessageId = -1;
        protected MessageIdImpl[] chunkedMessageIds;
        protected long receivedTime = 0L;
        private final Recycler.Handle<ChunkedMessageCtx> recyclerHandle;
        private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>(){

            @Override
            protected ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
                return new ChunkedMessageCtx(handle);
            }
        };

        static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf chunkedMsgBuffer) {
            ChunkedMessageCtx ctx = RECYCLER.get();
            ctx.totalChunks = numChunksFromMsg;
            ctx.chunkedMsgBuffer = chunkedMsgBuffer;
            ctx.chunkedMessageIds = new MessageIdImpl[numChunksFromMsg];
            ctx.receivedTime = System.currentTimeMillis();
            return ctx;
        }

        private ChunkedMessageCtx(Recycler.Handle<ChunkedMessageCtx> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        public void recycle() {
            this.totalChunks = -1;
            this.chunkedMsgBuffer = null;
            this.lastChunkedMessageId = -1;
            this.recyclerHandle.recycle(this);
        }
    }

    private static final class GetLastMessageIdResponse {
        final MessageId lastMessageId;
        final MessageId markDeletePosition;

        GetLastMessageIdResponse(MessageId lastMessageId, MessageId markDeletePosition) {
            this.lastMessageId = lastMessageId;
            this.markDeletePosition = markDeletePosition;
        }
    }
}

