/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.TransactionException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcherSingleActiveConsumer
implements Dispatcher,
AsyncCallbacks.ReadEntriesCallback {
    protected final PersistentTopic topic;
    protected final String name;
    private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
    protected volatile boolean havePendingRead = false;
    protected volatile int readBatchSize;
    protected final Backoff readFailureBackoff;
    private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
    private final RedeliveryTracker redeliveryTracker;
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);

    public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, CommandSubscribe.SubType subscriptionType, int partitionIndex, PersistentTopic topic, Subscription subscription) {
        super(subscriptionType, partitionIndex, topic.getName(), subscription, topic.getBrokerService().pulsar().getConfiguration(), cursor);
        this.topic = topic;
        this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName()) : "");
        this.readBatchSize = this.serviceConfig.getDispatcherMaxReadBatchSize();
        this.readFailureBackoff = new Backoff(this.serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, this.serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(), TimeUnit.MILLISECONDS, this.serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(), TimeUnit.MILLISECONDS);
        this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
    }

    @Override
    protected void scheduleReadOnActiveConsumer() {
        this.cancelPendingRead();
        if (this.havePendingRead) {
            return;
        }
        if (this.subscriptionType != CommandSubscribe.SubType.Failover || this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis() <= 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Rewind cursor and read more entries without delay", (Object)this.name);
            }
            this.cursor.rewind();
            Consumer activeConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
            this.notifyActiveConsumerChanged(activeConsumer);
            this.readMoreEntries(activeConsumer);
            return;
        }
        if (this.readOnActiveConsumerTask != null) {
            return;
        }
        this.readOnActiveConsumerTask = this.topic.getBrokerService().executor().schedule(() -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Rewind cursor and read more entries after {} ms delay", (Object)this.name, (Object)this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
            }
            this.cursor.rewind();
            Consumer activeConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
            this.notifyActiveConsumerChanged(activeConsumer);
            this.readMoreEntries(activeConsumer);
            this.readOnActiveConsumerTask = null;
        }, (long)this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    protected boolean isConsumersExceededOnSubscription() {
        return this.isConsumersExceededOnSubscription(this.topic.getBrokerService(), this.topic.getName(), this.consumers.size());
    }

    @Override
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
            this.havePendingRead = false;
        }
    }

    @Override
    public void readEntriesComplete(List<Entry> entries, Object obj) {
        this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.topicName, (SafeRunnable)SafeRun.safeRun(() -> this.internalReadEntriesComplete(entries, obj)));
    }

    public synchronized void internalReadEntriesComplete(List<Entry> entries, Object obj) {
        Consumer readConsumer = (Consumer)obj;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Got messages: {}", new Object[]{this.name, readConsumer, entries.size()});
        }
        this.havePendingRead = false;
        this.isFirstRead = false;
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int newReadBatchSize = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Increasing read batch size from {} to {}", new Object[]{this.name, readConsumer, this.readBatchSize, newReadBatchSize});
            }
            this.readBatchSize = newReadBatchSize;
        }
        this.readFailureBackoff.reduceToHalf();
        Consumer currentConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
        if (this.isKeyHashRangeFiltered) {
            Iterator<Entry> iterator = entries.iterator();
            while (iterator.hasNext()) {
                Entry entry = iterator.next();
                byte[] key = this.peekStickyKey(entry.getDataBuffer());
                Consumer consumer = this.stickyKeyConsumerSelector.select(key);
                if (consumer != null && currentConsumer == consumer) continue;
                entry.release();
                iterator.remove();
            }
        }
        if (currentConsumer == null || readConsumer != currentConsumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] rewind because no available consumer found", (Object)this.name);
            }
            entries.forEach(Entry::release);
            this.cursor.rewind();
            if (currentConsumer != null) {
                this.notifyActiveConsumerChanged(currentConsumer);
                this.readMoreEntries(currentConsumer);
            }
        } else {
            EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size());
            this.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, this.cursor, false);
            this.dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, batchIndexesAcks, sendMessageInfo);
        }
    }

    protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, SendMessageInfo sendMessageInfo) {
        currentConsumer.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), this.redeliveryTracker).addListener(future -> {
            if (future.isSuccess()) {
                if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
                    if (this.topic.getDispatchRateLimiter().isPresent()) {
                        this.topic.getDispatchRateLimiter().get().tryDispatchPermit(sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
                    }
                    this.dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes()));
                }
                this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.topicName, (SafeRunnable)SafeRun.safeRun(() -> {
                    PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = this;
                    synchronized (persistentDispatcherSingleActiveConsumer) {
                        Consumer newConsumer = this.getActiveConsumer();
                        if (newConsumer != null && !this.havePendingRead) {
                            this.readMoreEntries(newConsumer);
                        } else {
                            log.debug("[{}-{}] Ignoring write future complete. consumerAvailable={} havePendingRead={}", new Object[]{this.name, newConsumer, newConsumer != null, this.havePendingRead});
                        }
                    }
                }));
            }
        });
    }

    @Override
    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.topicName, (SafeRunnable)SafeRun.safeRun(() -> this.internalConsumerFlow(consumer)));
    }

    private synchronized void internalConsumerFlow(Consumer consumer) {
        if (this.havePendingRead) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", (Object)this.name, (Object)consumer);
            }
        } else if (ACTIVE_CONSUMER_UPDATER.get(this) != consumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer", (Object)this.name, (Object)consumer);
            }
        } else if (this.readOnActiveConsumerTask != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since consumer is waiting for cursor to be rewinded", (Object)this.name, (Object)consumer);
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Trigger new read after receiving flow control message", (Object)this.name, (Object)consumer);
            }
            this.readMoreEntries(consumer);
        }
    }

    @Override
    public void redeliverUnacknowledgedMessages(Consumer consumer) {
        this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.topicName, (SafeRunnable)SafeRun.safeRun(() -> this.internalRedeliverUnacknowledgedMessages(consumer)));
    }

    private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer) {
        if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend", (Object)this.name, (Object)consumer);
            return;
        }
        if (this.readOnActiveConsumerTask != null) {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: consumer is waiting for cursor to be rewinded", (Object)this.name, (Object)consumer);
            return;
        }
        this.cancelPendingRead();
        if (!this.havePendingRead) {
            this.cursor.rewind();
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", (Object)this.name, (Object)consumer);
            }
            this.readMoreEntries(consumer);
        } else {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", (Object)this.name, (Object)consumer);
        }
    }

    @Override
    public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
        positions.forEach(this.redeliveryTracker::addIfAbsent);
        this.redeliverUnacknowledgedMessages(consumer);
    }

    @Override
    protected void readMoreEntries(Consumer consumer) {
        if (null == consumer) {
            return;
        }
        if (consumer.getAvailablePermits() > 0) {
            Pair<Integer, Long> calculateResult = this.calculateToRead(consumer);
            int messagesToRead = calculateResult.getLeft();
            long bytesToRead = calculateResult.getRight();
            if (-1 == messagesToRead || bytesToRead == -1L) {
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Schedule read of {} messages", new Object[]{this.name, consumer, messagesToRead});
            }
            this.havePendingRead = true;
            if (consumer.readCompacted()) {
                this.topic.getCompactedTopic().asyncReadEntriesOrWait(this.cursor, messagesToRead, this.isFirstRead, this, consumer);
            } else {
                this.cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, consumer, this.topic.getMaxReadPosition());
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Consumer buffer is full, pause reading", (Object)this.name, (Object)consumer);
        }
    }

    protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
        int availablePermits = consumer.getAvailablePermits();
        if (!consumer.isWritable()) {
            availablePermits = 1;
        }
        int messagesToRead = Math.min(availablePermits, this.readBatchSize);
        long bytesToRead = this.serviceConfig.getDispatcherMaxReadSizeBytes();
        if (consumer.isPreciseDispatcherFlowControl()) {
            int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
            messagesToRead = Math.min((int)Math.ceil((double)availablePermits * 1.0 / (double)avgMessagesPerEntry), this.readBatchSize);
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent() && this.topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
                DispatchRateLimiter topicRateLimiter = this.topic.getDispatchRateLimiter().get();
                if (!topicRateLimiter.hasMessageDispatchPermit()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", new Object[]{this.name, topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(), 1000});
                    }
                    this.topic.getBrokerService().executor().schedule(() -> {
                        Consumer currentConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
                        if (currentConsumer != null && !this.havePendingRead) {
                            this.readMoreEntries(currentConsumer);
                        } else if (log.isDebugEnabled()) {
                            log.debug("[{}] Skipping read retry for topic: Current Consumer {}, havePendingRead {}", new Object[]{this.topic.getName(), currentConsumer, this.havePendingRead});
                        }
                    }, 1000L, TimeUnit.MILLISECONDS);
                    return Pair.of(-1, -1L);
                }
                Pair<Integer, Long> calculateResult = PersistentDispatcherSingleActiveConsumer.computeReadLimits(messagesToRead, (int)topicRateLimiter.getAvailableDispatchRateLimitOnMsg(), bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());
                messagesToRead = calculateResult.getLeft();
                bytesToRead = calculateResult.getRight();
            }
            if (this.dispatchRateLimiter.isPresent() && this.dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
                if (!this.dispatchRateLimiter.get().hasMessageDispatchPermit()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", new Object[]{this.name, this.dispatchRateLimiter.get().getDispatchRateOnMsg(), this.dispatchRateLimiter.get().getDispatchRateOnByte(), 1000});
                    }
                    this.topic.getBrokerService().executor().schedule(() -> {
                        Consumer currentConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
                        if (currentConsumer != null && !this.havePendingRead) {
                            this.readMoreEntries(currentConsumer);
                        } else if (log.isDebugEnabled()) {
                            log.debug("[{}] Skipping read retry: Current Consumer {}, havePendingRead {}", new Object[]{this.topic.getName(), currentConsumer, this.havePendingRead});
                        }
                    }, 1000L, TimeUnit.MILLISECONDS);
                    return Pair.of(-1, -1L);
                }
                Pair<Integer, Long> calculateResult = PersistentDispatcherSingleActiveConsumer.computeReadLimits(messagesToRead, (int)this.dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(), bytesToRead, this.dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
                messagesToRead = calculateResult.getLeft();
                bytesToRead = calculateResult.getRight();
            }
        }
        return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1L));
    }

    @Override
    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
        this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.topicName, (SafeRunnable)SafeRun.safeRun(() -> this.internalReadEntriesFailed(exception, ctx)));
    }

    private synchronized void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) {
        this.havePendingRead = false;
        Consumer c = (Consumer)ctx;
        long waitTimeMillis = this.readFailureBackoff.next();
        if (exception instanceof ManagedLedgerException.NoMoreEntriesToReadException) {
            if (this.cursor.getNumberOfEntriesInBacklog(false) == 0L) {
                this.consumers.forEach(Consumer::reachedEndOfTopic);
            }
        } else if (exception.getCause() instanceof TransactionException.TransactionNotSealedException) {
            waitTimeMillis = 1L;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", new Object[]{this.name, exception.getMessage(), (double)waitTimeMillis / 1000.0});
            }
        } else if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}-{}] Error reading entries at {} : {} - Retrying to read in {} seconds", new Object[]{this.name, c, this.cursor.getReadPosition(), exception.getMessage(), (double)waitTimeMillis / 1000.0});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds", new Object[]{this.name, c, this.cursor.getReadPosition(), exception.getMessage(), (double)waitTimeMillis / 1000.0});
        }
        Preconditions.checkNotNull(c);
        this.readBatchSize = this.serviceConfig.getDispatcherMinReadBatchSize();
        this.topic.getBrokerService().executor().schedule(() -> this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.topicName, (SafeRunnable)SafeRun.safeRun(() -> {
            PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = this;
            synchronized (persistentDispatcherSingleActiveConsumer) {
                Consumer currentConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
                if (currentConsumer != null && !this.havePendingRead) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}-{}] Retrying read operation", (Object)this.name, (Object)c);
                    }
                    if (currentConsumer != c) {
                        this.notifyActiveConsumerChanged(currentConsumer);
                    }
                    this.readMoreEntries(currentConsumer);
                } else {
                    log.info("[{}-{}] Skipping read retry: Current Consumer {}, havePendingRead {}", new Object[]{this.name, c, currentConsumer, this.havePendingRead});
                }
            }
        })), waitTimeMillis, TimeUnit.MILLISECONDS);
    }

    @Override
    public void addUnAckedMessages(int unAckMessages) {
    }

    @Override
    public RedeliveryTracker getRedeliveryTracker() {
        return this.redeliveryTracker;
    }

    @Override
    public Optional<DispatchRateLimiter> getRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override
    public void updateRateLimiter(DispatchRate dispatchRate) {
        if (!this.dispatchRateLimiter.isPresent() && dispatchRate != null) {
            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, DispatchRateLimiter.Type.SUBSCRIPTION));
        }
        this.dispatchRateLimiter.ifPresent(limiter -> {
            if (dispatchRate != null) {
                this.dispatchRateLimiter.get().updateDispatchRate(dispatchRate);
            } else {
                this.dispatchRateLimiter.get().updateDispatchRate();
            }
        });
    }

    @Override
    public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
        if (!this.dispatchRateLimiter.isPresent() && DispatchRateLimiter.isDispatchRateNeeded(this.topic.getBrokerService(), policies, this.topic.getName(), DispatchRateLimiter.Type.SUBSCRIPTION)) {
            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, DispatchRateLimiter.Type.SUBSCRIPTION));
        }
    }

    @Override
    public CompletableFuture<Void> close() {
        IS_CLOSED_UPDATER.set(this, 1);
        this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
        return this.disconnectAllConsumers();
    }

    @Override
    public boolean checkAndUnblockIfStuck() {
        Consumer consumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
        if (consumer == null || this.cursor.checkAndUpdateReadPositionChanged()) {
            return false;
        }
        int totalAvailablePermits = consumer.getAvailablePermits();
        if (totalAvailablePermits > 0 && !this.havePendingRead && this.cursor.getNumberOfEntriesInBacklog(false) > 0L) {
            log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", (Object)this.topic.getName(), (Object)this.name);
            this.readMoreEntries(consumer);
            return true;
        }
        return false;
    }
}

