/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryAndMetadata;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.SharedConsumerAssignor;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.TransactionException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMultipleConsumers
implements Dispatcher,
AsyncCallbacks.ReadEntriesCallback {
    protected final PersistentTopic topic;
    protected final ManagedCursor cursor;
    protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery;
    private CompletableFuture<Void> closeFuture = null;
    protected final MessageRedeliveryController redeliveryMessages;
    protected final RedeliveryTracker redeliveryTracker;
    private Optional<DelayedDeliveryTracker> delayedDeliveryTracker = Optional.empty();
    protected volatile boolean havePendingRead = false;
    protected volatile boolean havePendingReplayRead = false;
    protected volatile PositionImpl minReplayedPosition = null;
    protected boolean shouldRewindBeforeReadingOrReplaying = false;
    protected final String name;
    protected boolean sendInProgress;
    protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
    protected volatile int totalAvailablePermits = 0;
    protected volatile int readBatchSize;
    protected final Backoff readFailureBackoff;
    private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages");
    protected volatile int totalUnackedMessages = 0;
    private volatile int blockedDispatcherOnUnackedMsgs = 0;
    protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
    protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
    private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
    protected final ExecutorService dispatchMessagesThread;
    private final SharedConsumerAssignor assignor;
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);

    public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) {
        this(topic, cursor, subscription, true);
    }

    public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, boolean allowOutOfOrderDelivery) {
        super(subscription, topic.getBrokerService().pulsar().getConfiguration());
        this.cursor = cursor;
        this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
        this.name = topic.getName() + " / " + Codec.decode((String)cursor.getName());
        this.topic = topic;
        this.dispatchMessagesThread = topic.getBrokerService().getTopicOrderedExecutor().chooseThread();
        this.redeliveryMessages = new MessageRedeliveryController(allowOutOfOrderDelivery);
        this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() ? new InMemoryRedeliveryTracker() : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
        this.readBatchSize = this.serviceConfig.getDispatcherMaxReadBatchSize();
        this.initializeDispatchRateLimiterIfNeeded();
        this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
        this.readFailureBackoff = new Backoff((long)topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        if (IS_CLOSED_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher is already closed. Closing consumer {}", (Object)this.name, (Object)consumer);
            consumer.disconnect();
            return;
        }
        if (this.consumerList.isEmpty()) {
            if (this.havePendingRead || this.havePendingReplayRead) {
                this.shouldRewindBeforeReadingOrReplaying = true;
            } else {
                this.cursor.rewind();
                this.shouldRewindBeforeReadingOrReplaying = false;
            }
            this.redeliveryMessages.clear();
            this.delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::clear);
        }
        if (this.isConsumersExceededOnSubscription()) {
            log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", (Object)this.name);
            throw new BrokerServiceException.ConsumerBusyException("Subscription reached max consumers limit");
        }
        this.consumerList.add(consumer);
        if (this.consumerList.size() > 1 && consumer.getPriorityLevel() < ((Consumer)this.consumerList.get(this.consumerList.size() - 2)).getPriorityLevel()) {
            this.consumerList.sort(Comparator.comparingInt(Consumer::getPriorityLevel));
        }
        this.consumerSet.add((Object)consumer);
    }

    @Override
    protected boolean isConsumersExceededOnSubscription() {
        return this.isConsumersExceededOnSubscription(this.topic, this.consumerList.size());
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        this.addUnAckedMessages(-consumer.getUnackedMessages());
        if (this.consumerSet.removeAll((Object)consumer) == 1) {
            this.consumerList.remove(consumer);
            log.info("Removed consumer {} with pending {} acks", (Object)consumer, (Object)consumer.getPendingAcks().size());
            if (this.consumerList.isEmpty()) {
                this.cancelPendingRead();
                this.redeliveryMessages.clear();
                this.redeliveryTracker.clear();
                if (this.closeFuture != null) {
                    log.info("[{}] All consumers removed. Subscription is disconnected", (Object)this.name);
                    this.closeFuture.complete(null);
                }
                this.totalAvailablePermits = 0;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Consumer are left, reading more entries", (Object)this.name);
                }
                consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
                    if (this.addMessageToReplay(ledgerId, entryId, stickyKeyHash)) {
                        this.redeliveryTracker.addIfAbsent((Position)PositionImpl.get((long)ledgerId, (long)entryId));
                    }
                });
                this.totalAvailablePermits -= consumer.getAvailablePermits();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Decreased totalAvailablePermits by {} in PersistentDispatcherMultipleConsumers. New dispatcher permit count is {}", new Object[]{this.name, consumer.getAvailablePermits(), this.totalAvailablePermits});
                }
                this.readMoreEntries();
            }
        } else {
            log.info("[{}] Trying to remove a non-connected consumer: {}", (Object)this.name, (Object)consumer);
        }
    }

    @Override
    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        this.topic.getBrokerService().executor().execute((Runnable)SafeRun.safeRun(() -> this.internalConsumerFlow(consumer, additionalNumberOfMessages)));
    }

    private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        if (!this.consumerSet.contains((Object)consumer)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Ignoring flow control from disconnected consumer {}", (Object)this.name, (Object)consumer);
            }
            return;
        }
        this.totalAvailablePermits += additionalNumberOfMessages;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {} after adding {} permits", new Object[]{this.name, consumer, this.totalAvailablePermits, additionalNumberOfMessages});
        }
        this.readMoreEntries();
    }

    public void readMoreEntriesAsync() {
        this.topic.getBrokerService().executor().execute((Runnable)SafeRun.safeRun(this::readMoreEntries));
    }

    public synchronized void readMoreEntries() {
        if (this.sendInProgress) {
            return;
        }
        if (this.shouldPauseDeliveryForDelayTracker()) {
            return;
        }
        int firstAvailableConsumerPermits = this.getFirstAvailableConsumerPermits();
        int currentTotalAvailablePermits = Math.max(this.totalAvailablePermits, firstAvailableConsumerPermits);
        if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
            Pair<Integer, Long> calculateResult = this.calculateToRead(currentTotalAvailablePermits);
            int messagesToRead = (Integer)calculateResult.getLeft();
            long bytesToRead = (Long)calculateResult.getRight();
            if (messagesToRead == -1 || bytesToRead == -1L) {
                return;
            }
            Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(messagesToRead);
            if (!messagesToReplayNow.isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule replay of {} messages for {} consumers", new Object[]{this.name, messagesToReplayNow.size(), this.consumerList.size()});
                }
                this.havePendingReplayRead = true;
                this.minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
                Set<? extends Position> deletedMessages = this.topic.isDelayedDeliveryEnabled() ? this.asyncReplayEntriesInOrder(messagesToReplayNow) : this.asyncReplayEntries(messagesToReplayNow);
                deletedMessages.forEach(position -> this.redeliveryMessages.remove(((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId()));
                if (messagesToReplayNow.size() - deletedMessages.size() == 0) {
                    this.havePendingReplayRead = false;
                    this.readMoreEntriesAsync();
                }
            } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, this.totalUnackedMessages, this.topic.getMaxUnackedMessagesOnSubscription()});
                }
            } else if (!this.havePendingRead) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule read of {} messages for {} consumers", new Object[]{this.name, messagesToRead, this.consumerList.size()});
                }
                this.havePendingRead = true;
                Set<PositionImpl> toReplay = this.getMessagesToReplayNow(1);
                this.minReplayedPosition = toReplay.stream().findFirst().orElse(null);
                if (this.minReplayedPosition != null) {
                    this.redeliveryMessages.add(this.minReplayedPosition.getLedgerId(), this.minReplayedPosition.getEntryId());
                }
                this.cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, (AsyncCallbacks.ReadEntriesCallback)this, (Object)ReadType.Normal, this.topic.getMaxReadPosition());
            } else {
                log.debug("[{}] Cannot schedule next read until previous one is done", (Object)this.name);
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Consumer buffer is full, pause reading", (Object)this.name);
        }
    }

    @Override
    protected void reScheduleRead() {
        if (this.isRescheduleReadInProgress.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Reschedule message read in {} ms", new Object[]{this.topic.getName(), this.name, 1000});
            }
            this.topic.getBrokerService().executor().schedule(() -> {
                this.isRescheduleReadInProgress.set(false);
                this.readMoreEntries();
            }, 1000L, TimeUnit.MILLISECONDS);
        }
    }

    protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits) {
        int messagesToRead = Math.min(currentTotalAvailablePermits, this.readBatchSize);
        long bytesToRead = this.serviceConfig.getDispatcherMaxReadSizeBytes();
        Consumer c = this.getRandomConsumer();
        if (c != null && c.isPreciseDispatcherFlowControl()) {
            int avgMessagesPerEntry = Math.max(1, c.getAvgMessagesPerEntry());
            messagesToRead = Math.min((int)Math.ceil((double)currentTotalAvailablePermits * 1.0 / (double)avgMessagesPerEntry), this.readBatchSize);
        }
        if (!this.isConsumerWritable()) {
            messagesToRead = 1;
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            Pair<Integer, Long> calculateToRead;
            if (this.topic.getBrokerDispatchRateLimiter().isPresent()) {
                DispatchRateLimiter brokerRateLimiter = this.topic.getBrokerDispatchRateLimiter().get();
                if (this.reachDispatchRateLimit(brokerRateLimiter)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded broker message-rate {}/{}, schedule after a {}", new Object[]{this.name, brokerRateLimiter.getDispatchRateOnMsg(), brokerRateLimiter.getDispatchRateOnByte(), 1000});
                    }
                    return Pair.of((Object)-1, (Object)-1L);
                }
                calculateToRead = this.updateMessagesToRead(brokerRateLimiter, messagesToRead, bytesToRead);
                messagesToRead = (Integer)calculateToRead.getLeft();
                bytesToRead = (Long)calculateToRead.getRight();
            }
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                DispatchRateLimiter topicRateLimiter = this.topic.getDispatchRateLimiter().get();
                if (this.reachDispatchRateLimit(topicRateLimiter)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", new Object[]{this.name, topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(), 1000});
                    }
                    return Pair.of((Object)-1, (Object)-1L);
                }
                calculateToRead = this.updateMessagesToRead(topicRateLimiter, messagesToRead, bytesToRead);
                messagesToRead = (Integer)calculateToRead.getLeft();
                bytesToRead = (Long)calculateToRead.getRight();
            }
            if (this.dispatchRateLimiter.isPresent()) {
                if (this.reachDispatchRateLimit(this.dispatchRateLimiter.get())) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", new Object[]{this.name, this.dispatchRateLimiter.get().getDispatchRateOnMsg(), this.dispatchRateLimiter.get().getDispatchRateOnByte(), 1000});
                    }
                    return Pair.of((Object)-1, (Object)-1L);
                }
                Pair<Integer, Long> calculateToRead2 = this.updateMessagesToRead(this.dispatchRateLimiter.get(), messagesToRead, bytesToRead);
                messagesToRead = (Integer)calculateToRead2.getLeft();
                bytesToRead = (Long)calculateToRead2.getRight();
            }
        }
        if (this.havePendingReplayRead) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping replay while awaiting previous read to complete", (Object)this.name);
            }
            return Pair.of((Object)-1, (Object)-1L);
        }
        messagesToRead = Math.max(messagesToRead, 1);
        bytesToRead = Math.max(bytesToRead, 1L);
        return Pair.of((Object)messagesToRead, (Object)bytesToRead);
    }

    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
        return this.cursor.asyncReplayEntries(positions, (AsyncCallbacks.ReadEntriesCallback)this, (Object)ReadType.Replay);
    }

    protected Set<? extends Position> asyncReplayEntriesInOrder(Set<? extends Position> positions) {
        return this.cursor.asyncReplayEntries(positions, (AsyncCallbacks.ReadEntriesCallback)this, (Object)ReadType.Replay, true);
    }

    @Override
    public boolean isConsumerConnected() {
        return !this.consumerList.isEmpty();
    }

    @Override
    public CopyOnWriteArrayList<Consumer> getConsumers() {
        return this.consumerList;
    }

    @Override
    public synchronized boolean canUnsubscribe(Consumer consumer) {
        return this.consumerList.size() == 1 && this.consumerSet.contains((Object)consumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> close() {
        Optional<DelayedDeliveryTracker> delayedDeliveryTracker;
        IS_CLOSED_UPDATER.set(this, 1);
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = this;
        synchronized (persistentDispatcherMultipleConsumers) {
            delayedDeliveryTracker = this.delayedDeliveryTracker;
            this.delayedDeliveryTracker = Optional.empty();
        }
        delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::close);
        this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
        return this.disconnectAllConsumers();
    }

    @Override
    public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
        this.closeFuture = new CompletableFuture();
        if (this.consumerList.isEmpty()) {
            this.closeFuture.complete(null);
        } else {
            this.consumerList.forEach(consumer -> consumer.disconnect(isResetCursor));
            this.cancelPendingRead();
        }
        return this.closeFuture;
    }

    @Override
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
            this.havePendingRead = false;
        }
    }

    @Override
    public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
        return this.disconnectAllConsumers(isResetCursor);
    }

    @Override
    public synchronized void resetCloseFuture() {
        this.closeFuture = null;
    }

    @Override
    public void reset() {
        this.resetCloseFuture();
        IS_CLOSED_UPDATER.set(this, 0);
    }

    @Override
    public CommandSubscribe.SubType getType() {
        return CommandSubscribe.SubType.Shared;
    }

    public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
        ReadType readType = (ReadType)((Object)ctx);
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
        }
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int newReadBatchSize = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Increasing read batch size from {} to {}", new Object[]{this.name, this.readBatchSize, newReadBatchSize});
            }
            this.readBatchSize = newReadBatchSize;
        }
        this.readFailureBackoff.reduceToHalf();
        if (this.shouldRewindBeforeReadingOrReplaying && readType == ReadType.Normal) {
            entries.forEach(Entry::release);
            this.cursor.rewind();
            this.shouldRewindBeforeReadingOrReplaying = false;
            this.readMoreEntries();
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Distributing {} messages to {} consumers", new Object[]{this.name, entries.size(), this.consumerList.size()});
        }
        long size = entries.stream().mapToLong(Entry::getLength).sum();
        this.updatePendingBytesToDispatch(size);
        if (this.serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
            this.sendInProgress = true;
            this.dispatchMessagesThread.execute((Runnable)SafeRun.safeRun(() -> {
                if (this.sendMessagesToConsumers(readType, entries)) {
                    this.updatePendingBytesToDispatch(-size);
                    this.readMoreEntries();
                } else {
                    this.updatePendingBytesToDispatch(-size);
                }
            }));
        } else if (this.sendMessagesToConsumers(readType, entries)) {
            this.updatePendingBytesToDispatch(-size);
            this.readMoreEntriesAsync();
        } else {
            this.updatePendingBytesToDispatch(-size);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
        this.sendInProgress = true;
        try {
            boolean bl = this.trySendMessagesToConsumers(readType, entries);
            return bl;
        }
        finally {
            this.sendInProgress = false;
        }
    }

    protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
        int avgBatchSizePerMsg;
        int entriesToDispatch;
        if (this.needTrimAckedMessages()) {
            this.cursor.trimDeletedEntries(entries);
        }
        if ((entriesToDispatch = entries.size()) == 0) {
            return true;
        }
        MessageMetadata[] metadataArray = new MessageMetadata[entries.size()];
        int remainingMessages = 0;
        boolean hasChunk = false;
        for (int i = 0; i < metadataArray.length; ++i) {
            MessageMetadata metadata = Commands.peekAndCopyMessageMetadata((ByteBuf)entries.get(i).getDataBuffer(), (String)this.subscription.toString(), (long)-1L);
            if (metadata != null) {
                remainingMessages += metadata.getNumMessagesInBatch();
                if (!hasChunk && metadata.hasUuid()) {
                    hasChunk = true;
                }
            }
            metadataArray[i] = metadata;
        }
        if (hasChunk) {
            return this.sendChunkedMessagesToConsumers(readType, entries, metadataArray);
        }
        int start = 0;
        long totalMessagesSent = 0L;
        long totalBytesSent = 0L;
        long totalEntries = 0L;
        int n = avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;
        while (entriesToDispatch > 0) {
            int availablePermits;
            boolean dispatchMessage;
            int firstAvailableConsumerPermits = this.getFirstAvailableConsumerPermits();
            int currentTotalAvailablePermits = Math.max(this.totalAvailablePermits, firstAvailableConsumerPermits);
            boolean bl = dispatchMessage = currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0;
            if (!dispatchMessage) break;
            Consumer c = this.getNextConsumer();
            if (c == null) {
                log.info("[{}] rewind because no available consumer found from total {}", (Object)this.name, (Object)this.consumerList.size());
                entries.subList(start, entries.size()).forEach(Entry::release);
                this.cursor.rewind();
                return false;
            }
            int n2 = availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
            if (c.getMaxUnackedMessages() > 0) {
                int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0);
                availablePermits = Math.min(availablePermits, remainUnAckedMessages);
            }
            if (log.isDebugEnabled() && !c.isWritable()) {
                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; availablePermits are {}", new Object[]{this.topic.getName(), this.name, c, c.getAvailablePermits()});
            }
            int messagesForC = Math.min(Math.min(remainingMessages, availablePermits), this.serviceConfig.getDispatcherMaxRoundRobinBatchSize());
            messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);
            int end = Math.min(start + messagesForC, entries.size());
            List<Entry> entriesForThisConsumer = entries.subList(start, end);
            if (readType == ReadType.Replay) {
                entriesForThisConsumer.forEach(entry -> this.redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()));
            }
            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
            EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
            totalEntries += (long)this.filterEntriesForConsumer(metadataArray, start, entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, this.cursor, readType == ReadType.Replay, c);
            c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), this.redeliveryTracker);
            int msgSent = sendMessageInfo.getTotalMessages();
            remainingMessages -= msgSent;
            start += messagesForC;
            entriesToDispatch -= messagesForC;
            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
            if (log.isDebugEnabled()) {
                log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in PersistentDispatcherMultipleConsumers", new Object[]{this.name, msgSent, batchIndexesAcks.getTotalAckedIndexCount()});
            }
            totalMessagesSent += (long)sendMessageInfo.getTotalMessages();
            totalBytesSent += sendMessageInfo.getTotalBytes();
        }
        this.acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
        if (entriesToDispatch > 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", (Object)this.name, (Object)(entries.size() - start));
            }
            entries.subList(start, entries.size()).forEach(entry -> {
                long stickyKeyHash = this.getStickyKeyHash((Entry)entry);
                this.addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
                entry.release();
            });
        }
        return true;
    }

    private void acquirePermitsForDeliveredMessages(long totalEntries, long totalMessagesSent, long totalBytesSent) {
        long permits;
        long l = permits = this.dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getBrokerDispatchRateLimiter().isPresent()) {
                this.topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
            }
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                this.topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
            }
            if (this.dispatchRateLimiter.isPresent()) {
                this.dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
            }
        }
    }

    private boolean sendChunkedMessagesToConsumers(ReadType readType, List<Entry> entries, MessageMetadata[] metadataArray) {
        ArrayList<EntryAndMetadata> originalEntryAndMetadataList = new ArrayList<EntryAndMetadata>(metadataArray.length);
        for (int i = 0; i < metadataArray.length; ++i) {
            originalEntryAndMetadataList.add(EntryAndMetadata.create(entries.get(i), metadataArray[i]));
        }
        Map<Consumer, List<EntryAndMetadata>> assignResult = this.assignor.assign(originalEntryAndMetadataList, this.consumerList.size());
        long totalMessagesSent = 0L;
        long totalBytesSent = 0L;
        long totalEntries = 0L;
        AtomicInteger numConsumers = new AtomicInteger(assignResult.size());
        for (Map.Entry<Consumer, List<EntryAndMetadata>> current : assignResult.entrySet()) {
            Consumer consumer = current.getKey();
            List<EntryAndMetadata> entryAndMetadataList = current.getValue();
            int messagesForC = Math.min(consumer.getAvailablePermits(), entryAndMetadataList.size());
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} with messages num {}, read type is {}", new Object[]{this.name, consumer.consumerName(), messagesForC, readType});
            }
            if (messagesForC < entryAndMetadataList.size()) {
                for (int i = messagesForC; i < entryAndMetadataList.size(); ++i) {
                    EntryAndMetadata entry = entryAndMetadataList.get(i);
                    this.addMessageToReplay(entry);
                    entryAndMetadataList.set(i, null);
                }
            }
            if (messagesForC == 0) {
                numConsumers.decrementAndGet();
                continue;
            }
            if (readType == ReadType.Replay) {
                entryAndMetadataList.stream().limit(messagesForC).forEach(e -> this.redeliveryMessages.remove(e.getLedgerId(), e.getEntryId()));
            }
            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
            EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
            totalEntries += (long)this.filterEntriesForConsumer(entryAndMetadataList, batchSizes, sendMessageInfo, batchIndexesAcks, this.cursor, readType == ReadType.Replay, consumer);
            consumer.sendMessages(entryAndMetadataList, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), this.getRedeliveryTracker()).addListener(future -> {
                if (future.isDone() && numConsumers.decrementAndGet() == 0) {
                    this.readMoreEntries();
                }
            });
            TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
            totalMessagesSent += (long)sendMessageInfo.getTotalMessages();
            totalBytesSent += sendMessageInfo.getTotalBytes();
        }
        this.acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
        return numConsumers.get() == 0;
    }

    public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
        ReadType readType = (ReadType)((Object)ctx);
        long waitTimeMillis = this.readFailureBackoff.next();
        if (exception instanceof ManagedLedgerException.NoMoreEntriesToReadException) {
            if (this.cursor.getNumberOfEntriesInBacklog(false) == 0L) {
                this.consumerList.forEach(Consumer::reachedEndOfTopic);
            }
        } else if (exception.getCause() instanceof TransactionException.TransactionNotSealedException || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
            waitTimeMillis = 1L;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error reading transaction entries : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, exception.getMessage(), readType, (double)waitTimeMillis / 1000.0});
            }
        } else if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), exception.getMessage(), readType, (double)waitTimeMillis / 1000.0});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), exception.getMessage(), readType, (double)waitTimeMillis / 1000.0});
        }
        if (this.shouldRewindBeforeReadingOrReplaying) {
            this.shouldRewindBeforeReadingOrReplaying = false;
            this.cursor.rewind();
        }
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
            if (exception instanceof ManagedLedgerException.InvalidReplayPositionException) {
                PositionImpl markDeletePosition = (PositionImpl)this.cursor.getMarkDeletedPosition();
                this.redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
            }
        }
        this.readBatchSize = this.serviceConfig.getDispatcherMinReadBatchSize();
        this.topic.getBrokerService().executor().schedule(() -> {
            PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = this;
            synchronized (persistentDispatcherMultipleConsumers) {
                if (!this.havePendingRead || readType == ReadType.Replay) {
                    log.info("[{}] Retrying read operation", (Object)this.name);
                    this.readMoreEntries();
                } else {
                    log.info("[{}] Skipping read retry: havePendingRead {}", new Object[]{this.name, this.havePendingRead, exception});
                }
            }
        }, waitTimeMillis, TimeUnit.MILLISECONDS);
    }

    private boolean needTrimAckedMessages() {
        if (this.lastIndividualDeletedRangeFromCursorRecovery == null) {
            return false;
        }
        return ((PositionImpl)this.lastIndividualDeletedRangeFromCursorRecovery.upperEndpoint()).compareTo((PositionImpl)this.cursor.getReadPosition()) > 0;
    }

    protected boolean isAtleastOneConsumerAvailable() {
        return this.getFirstAvailableConsumerPermits() > 0;
    }

    protected int getFirstAvailableConsumerPermits() {
        if (this.consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == 1) {
            return 0;
        }
        for (Consumer consumer : this.consumerList) {
            int availablePermits;
            if (consumer == null || consumer.isBlocked() || (availablePermits = consumer.getAvailablePermits()) <= 0) continue;
            return availablePermits;
        }
        return 0;
    }

    private boolean isConsumerWritable() {
        for (Consumer consumer : this.consumerList) {
            if (!consumer.isWritable()) continue;
            return true;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer is not writable", (Object)this.topic.getName(), (Object)this.name);
        }
        return false;
    }

    @Override
    public boolean isConsumerAvailable(Consumer consumer) {
        return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
        consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> this.addMessageToReplay(ledgerId, entryId, stickyKeyHash));
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", new Object[]{this.name, consumer, this.redeliveryMessages});
        }
        this.readMoreEntries();
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
        positions.forEach(position -> {
            if (this.addMessageToReplay(position.getLedgerId(), position.getEntryId())) {
                this.redeliveryTracker.addIfAbsent((Position)position);
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", new Object[]{this.name, consumer, positions});
        }
        this.readMoreEntries();
    }

    @Override
    public void addUnAckedMessages(int numberOfMessages) {
        int unAckedMessages;
        int maxUnackedMessages = this.topic.getMaxUnackedMessagesOnSubscription();
        if (maxUnackedMessages <= 0 && this.blockedDispatcherOnUnackedMsgs == 1 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
            log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", (Object)this.name);
            this.readMoreEntriesAsync();
        }
        if ((unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages)) >= maxUnackedMessages && maxUnackedMessages > 0 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 0, 1)) {
            log.info("[{}] Dispatcher is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, TOTAL_UNACKED_MESSAGES_UPDATER.get(this), maxUnackedMessages});
        } else if (this.topic.getBrokerService().isBrokerDispatchingBlocked() && this.blockedDispatcherOnUnackedMsgs == 1) {
            if (this.totalUnackedMessages < this.topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
                this.topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList((Object[])new PersistentDispatcherMultipleConsumers[]{this}));
            }
        } else if (this.blockedDispatcherOnUnackedMsgs == 1 && unAckedMessages < maxUnackedMessages / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
            log.info("[{}] Dispatcher is unblocked", (Object)this.name);
            this.readMoreEntriesAsync();
        }
        this.topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
    }

    public boolean isBlockedDispatcherOnUnackedMsgs() {
        return this.blockedDispatcherOnUnackedMsgs == 1;
    }

    public void blockDispatcherOnUnackedMsgs() {
        this.blockedDispatcherOnUnackedMsgs = 1;
    }

    public void unBlockDispatcherOnUnackedMsgs() {
        this.blockedDispatcherOnUnackedMsgs = 0;
    }

    public int getTotalUnackedMessages() {
        return this.totalUnackedMessages;
    }

    public String getName() {
        return this.name;
    }

    @Override
    public RedeliveryTracker getRedeliveryTracker() {
        return this.redeliveryTracker;
    }

    @Override
    public Optional<DispatchRateLimiter> getRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override
    public void updateRateLimiter() {
        if (!this.initializeDispatchRateLimiterIfNeeded()) {
            this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
        }
    }

    @Override
    public boolean initializeDispatchRateLimiterIfNeeded() {
        if (!this.dispatchRateLimiter.isPresent() && DispatchRateLimiter.isDispatchRateEnabled((DispatchRate)this.topic.getSubscriptionDispatchRate(this.getSubscriptionName()))) {
            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, this.getSubscriptionName(), DispatchRateLimiter.Type.SUBSCRIPTION));
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
        if (!this.topic.isDelayedDeliveryEnabled()) {
            return false;
        }
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = this;
        synchronized (persistentDispatcherMultipleConsumers) {
            if (!this.delayedDeliveryTracker.isPresent()) {
                if (!msgMetadata.hasDeliverAtTime()) {
                    return false;
                }
                this.delayedDeliveryTracker = Optional.of(this.topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
            }
            this.delayedDeliveryTracker.get().resetTickTime(this.topic.getDelayedDeliveryTickTimeMillis());
            long deliverAtTime = msgMetadata.hasDeliverAtTime() ? msgMetadata.getDeliverAtTime() : -1L;
            return this.delayedDeliveryTracker.get().addMessage(ledgerId, entryId, deliverAtTime);
        }
    }

    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
        if (!this.redeliveryMessages.isEmpty()) {
            return this.redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
        }
        if (this.delayedDeliveryTracker.isPresent() && this.delayedDeliveryTracker.get().hasMessageAvailable()) {
            this.delayedDeliveryTracker.get().resetTickTime(this.topic.getDelayedDeliveryTickTimeMillis());
            Set<PositionImpl> messagesAvailableNow = this.delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
            messagesAvailableNow.forEach(p -> this.redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
            return messagesAvailableNow;
        }
        return Collections.emptySet();
    }

    protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
        return this.delayedDeliveryTracker.isPresent() && this.delayedDeliveryTracker.get().shouldPauseAllDeliveries();
    }

    @Override
    public synchronized long getNumberOfDelayedMessages() {
        return this.delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
    }

    @Override
    public void clearDelayedMessages() {
        this.delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::clear);
    }

    @Override
    public void cursorIsReset() {
        if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
            this.lastIndividualDeletedRangeFromCursorRecovery = null;
        }
    }

    private void addMessageToReplay(Entry entry) {
        this.addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
        entry.release();
    }

    protected boolean addMessageToReplay(long ledgerId, long entryId, long stickyKeyHash) {
        if (this.checkIfMessageIsUnacked(ledgerId, entryId)) {
            this.redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
            return true;
        }
        return false;
    }

    protected boolean addMessageToReplay(long ledgerId, long entryId) {
        if (this.checkIfMessageIsUnacked(ledgerId, entryId)) {
            this.redeliveryMessages.add(ledgerId, entryId);
            return true;
        }
        return false;
    }

    private boolean checkIfMessageIsUnacked(long ledgerId, long entryId) {
        Position markDeletePosition = this.cursor.getMarkDeletedPosition();
        return markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId() || ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId();
    }

    @Override
    public boolean checkAndUnblockIfStuck() {
        if (this.cursor.checkAndUpdateReadPositionChanged()) {
            return false;
        }
        if (this.totalAvailablePermits > 0 && !this.havePendingReplayRead && !this.havePendingRead && this.cursor.getNumberOfEntriesInBacklog(false) > 0L) {
            log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", (Object)this.topic.getName(), (Object)this.name);
            this.readMoreEntries();
            return true;
        }
        return false;
    }

    public PersistentTopic getTopic() {
        return this.topic;
    }

    public long getDelayedTrackerMemoryUsage() {
        if (this.delayedDeliveryTracker.isEmpty()) {
            return 0L;
        }
        if (this.delayedDeliveryTracker.get() instanceof InMemoryDelayedDeliveryTracker) {
            return ((InMemoryDelayedDeliveryTracker)this.delayedDeliveryTracker.get()).getBufferMemoryUsage();
        }
        return 0L;
    }

    protected int getStickyKeyHash(Entry entry) {
        return StickyKeyConsumerSelector.makeStickyKeyHash(this.peekStickyKey(entry.getDataBuffer()));
    }

    protected static enum ReadType {
        Normal,
        Replay;

    }
}

