/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDispatcherMultipleConsumers {
    private final boolean allowOutOfOrderDelivery;
    private final StickyKeyConsumerSelector selector;
    private boolean isDispatcherStuckOnReplays = false;
    private final KeySharedMode keySharedMode;
    private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;
    private final Set<Consumer> stuckConsumers;
    private final Set<Consumer> nextStuckConsumers;
    private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>(){

        @Override
        protected Map<Consumer, List<Entry>> initialValue() throws Exception {
            return new HashMap<Consumer, List<Entry>>();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

    PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
        super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
        this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
        this.recentlyJoinedConsumers = this.allowOutOfOrderDelivery ? null : new LinkedHashMap();
        this.stuckConsumers = new HashSet<Consumer>();
        this.nextStuckConsumers = new HashSet<Consumer>();
        this.keySharedMode = ksm.getKeySharedMode();
        switch (this.keySharedMode) {
            case AUTO_SPLIT: {
                if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
                    break;
                }
                this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
                break;
            }
            case STICKY: {
                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid key-shared mode: " + (Object)((Object)this.keySharedMode));
            }
        }
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        super.addConsumer(consumer);
        try {
            this.selector.addConsumer(consumer);
        }
        catch (BrokerServiceException e) {
            this.consumerSet.removeAll(consumer);
            this.consumerList.remove(consumer);
            throw e;
        }
        PositionImpl readPositionWhenJoining = (PositionImpl)this.cursor.getReadPosition();
        consumer.setReadPositionWhenJoining(readPositionWhenJoining);
        if (!this.allowOutOfOrderDelivery && this.recentlyJoinedConsumers != null && this.consumerList.size() > 1 && this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1L) {
            this.recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
        }
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        this.selector.removeConsumer(consumer);
        super.removeConsumer(consumer);
        if (this.recentlyJoinedConsumers != null) {
            this.recentlyJoinedConsumers.remove(consumer);
            if (this.consumerList.size() == 1) {
                this.recentlyJoinedConsumers.clear();
            }
            if (this.removeConsumersFromRecentJoinedConsumers() || !this.redeliveryMessages.isEmpty()) {
                this.readMoreEntries();
            }
        }
    }

    @Override
    protected void sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType readType, List<Entry> entries) {
        PositionImpl relayPosition;
        Set<PositionImpl> messagesToReplayNow;
        long totalMessagesSent = 0L;
        long totalBytesSent = 0L;
        int entriesCount = entries.size();
        if (entriesCount == 0) {
            this.readMoreEntries();
            return;
        }
        if (this.consumerSet.isEmpty()) {
            entries.forEach(Entry::release);
            this.cursor.rewind();
            return;
        }
        if (!this.allowOutOfOrderDelivery && (messagesToReplayNow = this.getMessagesToReplayNow(1)) != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null && (relayPosition = (PositionImpl)messagesToReplayNow.stream().findFirst().get()).compareTo(this.minReplayedPosition) < 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this read and retry with readMoreEntries.", new Object[]{this.name, relayPosition, this.minReplayedPosition, readType});
            }
            if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
                entries.forEach(entry -> {
                    long stickyKeyHash = this.getStickyKeyHash((Entry)entry);
                    this.addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
                    entry.release();
                });
            } else if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                entries.forEach(Entry::release);
            }
            this.readMoreEntries();
            return;
        }
        this.nextStuckConsumers.clear();
        Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
        groupedEntries.clear();
        HashMap<Consumer, Set> consumerStickyKeyHashesMap = new HashMap<Consumer, Set>();
        for (Entry entry2 : entries) {
            int stickyKeyHash = this.getStickyKeyHash(entry2);
            Consumer c = this.selector.select(stickyKeyHash);
            if (c != null) {
                groupedEntries.computeIfAbsent(c, k -> new ArrayList()).add(entry2);
                consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet()).add(stickyKeyHash);
                continue;
            }
            this.addMessageToReplay(entry2.getLedgerId(), entry2.getEntryId(), stickyKeyHash);
            entry2.release();
        }
        AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
        int currentThreadKeyNumber = groupedEntries.size();
        if (currentThreadKeyNumber == 0) {
            currentThreadKeyNumber = -1;
        }
        for (Map.Entry<Consumer, List<Entry>> current : groupedEntries.entrySet()) {
            Entry entry3;
            int i;
            int availablePermits;
            Consumer consumer = current.getKey();
            List<Entry> entriesWithSameKey = current.getValue();
            int entriesWithSameKeyCount = entriesWithSameKey.size();
            int n = availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
            if (consumer != null && consumer.getMaxUnackedMessages() > 0) {
                availablePermits = Math.min(availablePermits, consumer.getMaxUnackedMessages() - consumer.getUnackedMessages());
            }
            int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
            int messagesForC = this.getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, readType, (Set)consumerStickyKeyHashesMap.get(consumer));
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} with messages num {}, read type is {}", new Object[]{this.name, consumer == null ? "null" : consumer.consumerName(), messagesForC, readType});
            }
            if (messagesForC < entriesWithSameKeyCount) {
                for (i = messagesForC; i < entriesWithSameKeyCount; ++i) {
                    entry3 = entriesWithSameKey.get(i);
                    long stickyKeyHash = this.getStickyKeyHash(entry3);
                    this.addMessageToReplay(entry3.getLedgerId(), entry3.getEntryId(), stickyKeyHash);
                    entry3.release();
                    entriesWithSameKey.set(i, null);
                }
            }
            if (messagesForC > 0) {
                if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                    for (i = 0; i < messagesForC; ++i) {
                        entry3 = entriesWithSameKey.get(i);
                        this.redeliveryMessages.remove(entry3.getLedgerId(), entry3.getEntryId());
                    }
                }
                SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
                this.filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, this.cursor, readType == PersistentDispatcherMultipleConsumers.ReadType.Replay);
                consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), this.getRedeliveryTracker()).addListener(future -> {
                    if (future.isDone() && keyNumbers.decrementAndGet() == 0) {
                        this.readMoreEntries();
                    }
                });
                TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
                totalMessagesSent += (long)sendMessageInfo.getTotalMessages();
                totalBytesSent += sendMessageInfo.getTotalBytes();
                continue;
            }
            currentThreadKeyNumber = keyNumbers.decrementAndGet();
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                this.topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }
            if (this.dispatchRateLimiter.isPresent()) {
                ((DispatchRateLimiter)this.dispatchRateLimiter.get()).tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }
        }
        this.stuckConsumers.clear();
        if (totalMessagesSent == 0L && (this.recentlyJoinedConsumers == null || this.recentlyJoinedConsumers.isEmpty())) {
            if (!this.nextStuckConsumers.isEmpty()) {
                this.isDispatcherStuckOnReplays = true;
                this.stuckConsumers.addAll(this.nextStuckConsumers);
            }
            this.readMoreEntries();
        } else if (currentThreadKeyNumber == 0) {
            this.topic.getBrokerService().executor().schedule(() -> {
                PersistentStickyKeyDispatcherMultipleConsumers persistentStickyKeyDispatcherMultipleConsumers = this;
                synchronized (persistentStickyKeyDispatcherMultipleConsumers) {
                    this.readMoreEntries();
                }
            }, 100L, TimeUnit.MILLISECONDS);
        }
    }

    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages, PersistentDispatcherMultipleConsumers.ReadType readType, Set<Integer> stickyKeyHashes) {
        PositionImpl minReadPositionForRecentJoinedConsumer;
        if (maxMessages == 0) {
            this.nextStuckConsumers.add(consumer);
            return 0;
        }
        if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal && stickyKeyHashes != null && this.redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
            return 0;
        }
        if (this.recentlyJoinedConsumers == null) {
            return maxMessages;
        }
        this.removeConsumersFromRecentJoinedConsumers();
        PositionImpl maxReadPosition = this.recentlyJoinedConsumers.get(consumer);
        if (maxReadPosition == null) {
            if (this.stuckConsumers.contains(consumer)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", (Object)this.name, (Object)consumer);
                }
                return 0;
            }
            return maxMessages;
        }
        if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay && (minReadPositionForRecentJoinedConsumer = this.recentlyJoinedConsumers.values().iterator().next()) != null && minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) {
            maxReadPosition = minReadPositionForRecentJoinedConsumer;
        }
        for (int i = 0; i < maxMessages; ++i) {
            if (((PositionImpl)entries.get(i).getPosition()).compareTo(maxReadPosition) < 0) continue;
            return i;
        }
        return maxMessages;
    }

    @Override
    public void markDeletePositionMoveForward() {
        this.topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
            PersistentStickyKeyDispatcherMultipleConsumers persistentStickyKeyDispatcherMultipleConsumers = this;
            synchronized (persistentStickyKeyDispatcherMultipleConsumers) {
                if (this.recentlyJoinedConsumers != null && !this.recentlyJoinedConsumers.isEmpty() && this.removeConsumersFromRecentJoinedConsumers()) {
                    this.readMoreEntries();
                }
            }
        });
    }

    private boolean removeConsumersFromRecentJoinedConsumers() {
        Iterator<Map.Entry<Consumer, PositionImpl>> itr = this.recentlyJoinedConsumers.entrySet().iterator();
        boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
        PositionImpl mdp = (PositionImpl)this.cursor.getMarkDeletedPosition();
        if (mdp != null) {
            Map.Entry<Consumer, PositionImpl> entry;
            PositionImpl nextPositionOfTheMarkDeletePosition = ((ManagedLedgerImpl)this.cursor.getManagedLedger()).getNextValidPosition(mdp);
            while (itr.hasNext() && (entry = itr.next()).getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) {
                itr.remove();
                hasConsumerRemovedFromTheRecentJoinedConsumers = true;
            }
        }
        return hasConsumerRemovedFromTheRecentJoinedConsumers;
    }

    @Override
    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
        if (this.isDispatcherStuckOnReplays) {
            this.isDispatcherStuckOnReplays = false;
            return Collections.emptySet();
        }
        return super.getMessagesToReplayNow(maxMessagesToRead);
    }

    @Override
    public CommandSubscribe.SubType getType() {
        return CommandSubscribe.SubType.Key_Shared;
    }

    @Override
    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
        return this.cursor.asyncReplayEntries(positions, this, (Object)PersistentDispatcherMultipleConsumers.ReadType.Replay, true);
    }

    public KeySharedMode getKeySharedMode() {
        return this.keySharedMode;
    }

    public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
        return this.recentlyJoinedConsumers;
    }

    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
        return this.selector.getConsumerKeyHashRanges();
    }

    public boolean isAllowOutOfOrderDelivery() {
        return this.allowOutOfOrderDelivery;
    }
}

