/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentStreamingDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentStreamingDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionSnapshotCache;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentSubscription
implements Subscription {
    protected final PersistentTopic topic;
    protected final ManagedCursor cursor;
    protected volatile Dispatcher dispatcher;
    protected final String topicName;
    protected final String subName;
    protected final String fullName;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private static final AtomicIntegerFieldUpdater<PersistentSubscription> IS_FENCED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentSubscription.class, "isFenced");
    private volatile int isFenced = 0;
    private PersistentMessageExpiryMonitor expiryMonitor;
    private long lastExpireTimestamp = 0L;
    private long lastConsumedFlowTimestamp = 0L;
    private long lastMarkDeleteAdvancedTimestamp = 0L;
    private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
    private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";
    private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<String, Long>();
    private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
    private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
    private volatile Position lastMarkDeleteForTransactionMarker;
    private volatile boolean isDeleteTransactionMarkerInProcess = false;
    private final PendingAckHandle pendingAckHandle;
    private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
    private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
    private final AsyncCallbacks.MarkDeleteCallback markDeleteCallback = new AsyncCallbacks.MarkDeleteCallback(){

        public void markDeleteComplete(Object ctx) {
            PositionImpl oldMD = (PositionImpl)ctx;
            PositionImpl newMD = (PositionImpl)PersistentSubscription.this.cursor.getMarkDeletedPosition();
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Mark deleted messages to position {} from position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, newMD, oldMD});
            }
            PersistentSubscription.this.notifyTheMarkDeletePositionMoveForwardIfNeeded((Position)oldMD);
        }

        public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Failed to mark delete for position {}: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, ctx, exception});
            }
        }
    };
    private final AsyncCallbacks.DeleteCallback deleteCallback = new AsyncCallbacks.DeleteCallback(){

        public void deleteComplete(Object position) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Deleted message at {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, position});
            }
            PersistentSubscription.this.notifyTheMarkDeletePositionMoveForwardIfNeeded((Position)((PositionImpl)position));
        }

        public void deleteFailed(ManagedLedgerException exception, Object ctx) {
            log.warn("[{}][{}] Failed to delete message at {}: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, ctx, exception});
        }
    };
    private static final Logger log;

    static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
        return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
    }

    static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
        return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
    }

    public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, boolean replicated) {
        this.topic = topic;
        this.cursor = cursor;
        this.topicName = topic.getName();
        this.subName = subscriptionName;
        this.fullName = MoreObjects.toStringHelper((Object)this).add("topic", (Object)this.topicName).add("name", (Object)this.subName).toString();
        this.expiryMonitor = new PersistentMessageExpiryMonitor(this.topicName, subscriptionName, cursor, this);
        this.setReplicated(replicated);
        this.pendingAckHandle = topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() && !EventsTopicNames.checkTopicIsEventsNames((TopicName)TopicName.get((String)this.topicName)) && !this.topicName.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()) && !this.topicName.startsWith("__transaction_log_") && !this.topicName.endsWith("__transaction_pending_ack") ? new PendingAckHandleImpl(this) : new PendingAckHandleDisabled();
        IS_FENCED_UPDATER.set(this, 0);
    }

    public void updateLastMarkDeleteAdvancedTimestamp() {
        this.lastMarkDeleteAdvancedTimestamp = Math.max(this.lastMarkDeleteAdvancedTimestamp, System.currentTimeMillis());
    }

    @Override
    public BrokerInterceptor interceptor() {
        return this.topic.getBrokerService().getInterceptor();
    }

    @Override
    public String getName() {
        return this.subName;
    }

    @Override
    public Topic getTopic() {
        return this.topic;
    }

    @Override
    public boolean isReplicated() {
        return this.replicatedSubscriptionSnapshotCache != null;
    }

    void setReplicated(boolean replicated) {
        this.replicatedSubscriptionSnapshotCache = replicated ? new ReplicatedSubscriptionSnapshotCache(this.subName, this.topic.getBrokerService().pulsar().getConfiguration().getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()) : null;
    }

    @Override
    public CompletableFuture<Void> addConsumer(Consumer consumer) {
        return this.pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> {
            PersistentSubscription persistentSubscription = this;
            synchronized (persistentSubscription) {
                this.cursor.updateLastActive();
                if (IS_FENCED_UPDATER.get(this) == 1) {
                    log.warn("Attempting to add consumer {} on a fenced subscription", (Object)consumer);
                    return FutureUtil.failedFuture((Throwable)new BrokerServiceException.SubscriptionFencedException("Subscription is fenced"));
                }
                if (this.dispatcher == null || !this.dispatcher.isConsumerConnected()) {
                    Dispatcher previousDispatcher = null;
                    boolean useStreamingDispatcher = this.topic.getBrokerService().getPulsar().getConfiguration().isStreamingDispatch();
                    switch (consumer.subType()) {
                        case Exclusive: {
                            if (this.dispatcher != null && this.dispatcher.getType() == CommandSubscribe.SubType.Exclusive) break;
                            previousDispatcher = this.dispatcher;
                            this.dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Exclusive, 0, this.topic, this) : new PersistentDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Exclusive, 0, this.topic, this);
                            break;
                        }
                        case Shared: {
                            if (this.dispatcher != null && this.dispatcher.getType() == CommandSubscribe.SubType.Shared) break;
                            previousDispatcher = this.dispatcher;
                            this.dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherMultipleConsumers(this.topic, this.cursor, this) : new PersistentDispatcherMultipleConsumers(this.topic, this.cursor, this);
                            break;
                        }
                        case Failover: {
                            int partitionIndex = TopicName.getPartitionIndex((String)this.topicName);
                            if (partitionIndex < 0) {
                                partitionIndex = -1;
                            }
                            if (this.dispatcher != null && this.dispatcher.getType() == CommandSubscribe.SubType.Failover) break;
                            previousDispatcher = this.dispatcher;
                            this.dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Failover, partitionIndex, this.topic, this) : new PersistentDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Failover, partitionIndex, this.topic, this);
                            break;
                        }
                        case Key_Shared: {
                            if (this.dispatcher != null && this.dispatcher.getType() == CommandSubscribe.SubType.Key_Shared) break;
                            previousDispatcher = this.dispatcher;
                            KeySharedMeta ksm = consumer.getKeySharedMeta();
                            this.dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(this.topic, this.cursor, this, this.topic.getBrokerService().getPulsar().getConfiguration(), ksm);
                            break;
                        }
                        default: {
                            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.ServerMetadataException("Unsupported subscription type"));
                        }
                    }
                    if (previousDispatcher != null) {
                        ((CompletableFuture)previousDispatcher.close().thenRun(() -> log.info("[{}][{}] Successfully closed previous dispatcher", (Object)this.topicName, (Object)this.subName))).exceptionally(ex -> {
                            log.error("[{}][{}] Failed to close previous dispatcher", new Object[]{this.topicName, this.subName, ex});
                            return null;
                        });
                    }
                } else if (consumer.subType() != this.dispatcher.getType()) {
                    return FutureUtil.failedFuture((Throwable)new BrokerServiceException.SubscriptionBusyException("Subscription is of different type"));
                }
                try {
                    this.dispatcher.addConsumer(consumer);
                    return CompletableFuture.completedFuture(null);
                }
                catch (BrokerServiceException brokerServiceException) {
                    return FutureUtil.failedFuture((Throwable)brokerServiceException);
                }
            }
        });
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
        this.cursor.updateLastActive();
        if (this.dispatcher != null) {
            this.dispatcher.removeConsumer(consumer);
        }
        ConsumerStatsImpl stats = consumer.getStats();
        this.bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
        this.msgOutFromRemovedConsumer.add(stats.msgOutCounter);
        if (this.dispatcher != null && this.dispatcher.getConsumers().isEmpty()) {
            this.deactivateCursor();
            if (!this.cursor.isDurable()) {
                ((CompletableFuture)this.close().thenRun(() -> {
                    PersistentSubscription persistentSubscription = this;
                    synchronized (persistentSubscription) {
                        if (this.dispatcher != null) {
                            ((CompletableFuture)this.dispatcher.close().thenRun(() -> log.info("[{}][{}] Successfully closed dispatcher for reader", (Object)this.topicName, (Object)this.subName))).exceptionally(ex -> {
                                log.error("[{}][{}] Failed to close dispatcher for reader", new Object[]{this.topicName, this.subName, ex});
                                return null;
                            });
                        }
                    }
                })).exceptionally(exception -> {
                    log.error("[{}][{}] Failed to close subscription for reader", new Object[]{this.topicName, this.subName, exception});
                    return null;
                });
                this.topic.getBrokerService().pulsar().getExecutor().submit(() -> {
                    this.topic.removeSubscription(this.subName);
                    if (!isResetCursor) {
                        try {
                            this.topic.getManagedLedger().deleteCursor(this.cursor.getName());
                        }
                        catch (InterruptedException | ManagedLedgerException e) {
                            log.warn("[{}] [{}] Failed to remove non durable cursor", new Object[]{this.topic.getName(), this.subName, e});
                        }
                    }
                });
            }
        }
        this.topic.decrementUsageCount();
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Removed consumer -- count: {}", new Object[]{this.topic.getName(), this.subName, consumer.consumerName(), this.topic.currentUsageCount()});
        }
    }

    public void deactivateCursor() {
        this.cursor.setInactive();
    }

    @Override
    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
        this.dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
    }

    @Override
    public void acknowledgeMessage(List<Position> positions, CommandAck.AckType ackType, Map<String, Long> properties) {
        Position previousMarkDeletePosition = this.cursor.getMarkDeletedPosition();
        if (ackType == CommandAck.AckType.Cumulative) {
            if (positions.size() != 1) {
                log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids.", (Object)this.topicName, (Object)this.subName);
                return;
            }
            Position position2 = positions.get(0);
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Cumulative ack on {}", new Object[]{this.topicName, this.subName, position2});
            }
            this.cursor.asyncMarkDelete(position2, this.mergeCursorProperties(properties), this.markDeleteCallback, (Object)previousMarkDeletePosition);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Individual acks on {}", new Object[]{this.topicName, this.subName, positions});
            }
            this.cursor.asyncDelete(positions, this.deleteCallback, (Object)previousMarkDeletePosition);
            if (this.topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
                positions.forEach(position -> {
                    if (((ManagedCursorImpl)this.cursor).isMessageDeleted(position)) {
                        this.pendingAckHandle.clearIndividualPosition((Position)position);
                    }
                });
            }
            if (this.dispatcher != null) {
                this.dispatcher.getRedeliveryTracker().removeBatch(positions);
            }
        }
        if (!this.cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
            ReplicatedSubscriptionsSnapshot snapshot;
            this.updateLastMarkDeleteAdvancedTimestamp();
            ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
            if (snapshotCache != null && (snapshot = snapshotCache.advancedMarkDeletePosition((PositionImpl)this.cursor.getMarkDeletedPosition())) != null) {
                this.topic.getReplicatedSubscriptionController().ifPresent(c -> c.localSubscriptionUpdated(this.subName, snapshot));
            }
        }
        if (this.topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() && this.pendingAckHandle.isTransactionAckPresent()) {
            Position currentMarkDeletePosition = this.cursor.getMarkDeletedPosition();
            if (!(this.lastMarkDeleteForTransactionMarker != null && ((PositionImpl)this.lastMarkDeleteForTransactionMarker).compareTo((PositionImpl)currentMarkDeletePosition) >= 0 || this.isDeleteTransactionMarkerInProcess)) {
                this.isDeleteTransactionMarkerInProcess = true;
                this.deleteTransactionMarker((PositionImpl)currentMarkDeletePosition, ackType, properties);
            }
        }
        if (this.topic.getManagedLedger().isTerminated() && this.cursor.getNumberOfEntriesInBacklog(false) == 0L && this.dispatcher != null) {
            this.dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
        }
    }

    private void deleteTransactionMarker(final PositionImpl position, final CommandAck.AckType ackType, final Map<String, Long> properties) {
        if (position != null) {
            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)this.cursor.getManagedLedger();
            final PositionImpl nextPosition = managedLedger.getNextValidPosition(position);
            if (nextPosition != null && nextPosition.compareTo((PositionImpl)managedLedger.getLastConfirmedEntry()) <= 0) {
                managedLedger.asyncReadEntry(nextPosition, new AsyncCallbacks.ReadEntryCallback(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void readEntryComplete(Entry entry, Object ctx) {
                        try {
                            MessageMetadata messageMetadata = Commands.parseMessageMetadata((ByteBuf)entry.getDataBuffer());
                            PersistentSubscription.this.isDeleteTransactionMarkerInProcess = false;
                            if (Markers.isTxnCommitMarker((MessageMetadata)messageMetadata) || Markers.isTxnAbortMarker((MessageMetadata)messageMetadata)) {
                                PersistentSubscription.this.lastMarkDeleteForTransactionMarker = (Position)position;
                                PersistentSubscription.this.acknowledgeMessage(Collections.singletonList(nextPosition), ackType, properties);
                            }
                        }
                        finally {
                            entry.release();
                        }
                    }

                    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                        PersistentSubscription.this.isDeleteTransactionMarkerInProcess = false;
                        log.error("Fail to read transaction marker! Position : {}", (Object)position, (Object)exception);
                    }
                }, null);
            } else {
                this.isDeleteTransactionMarkerInProcess = false;
            }
        } else {
            this.isDeleteTransactionMarkerInProcess = false;
        }
    }

    public CompletableFuture<Void> transactionIndividualAcknowledge(TxnID txnId, List<MutablePair<PositionImpl, Integer>> positions) {
        return this.pendingAckHandle.individualAcknowledgeMessage(txnId, positions);
    }

    public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID txnId, List<PositionImpl> positions) {
        return this.pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions);
    }

    private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position oldPosition) {
        PositionImpl oldMD = (PositionImpl)oldPosition;
        PositionImpl newMD = (PositionImpl)this.cursor.getMarkDeletedPosition();
        if (this.dispatcher != null && newMD.compareTo(oldMD) > 0) {
            this.dispatcher.markDeletePositionMoveForward();
        }
    }

    public String toString() {
        return this.fullName;
    }

    @Override
    public String getTopicName() {
        return this.topicName;
    }

    @Override
    public CommandSubscribe.SubType getType() {
        return this.dispatcher != null ? this.dispatcher.getType() : null;
    }

    @Override
    public String getTypeString() {
        CommandSubscribe.SubType type = this.getType();
        if (type == null) {
            return "None";
        }
        switch (type) {
            case Exclusive: {
                return "Exclusive";
            }
            case Failover: {
                return "Failover";
            }
            case Shared: {
                return "Shared";
            }
            case Key_Shared: {
                return "Key_Shared";
            }
        }
        return "Null";
    }

    @Override
    public CompletableFuture<Void> clearBacklog() {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Backlog size before clearing: {}", new Object[]{this.topicName, this.subName, this.cursor.getNumberOfEntriesInBacklog(false)});
        }
        this.cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback(){

            public void clearBacklogComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Backlog size after clearing: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog(false)});
                }
                if (PersistentSubscription.this.dispatcher != null) {
                    PersistentSubscription.this.dispatcher.clearDelayedMessages();
                }
                future.complete(null);
            }

            public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{}] Failed to clear backlog", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, exception});
                future.completeExceptionally(exception);
            }
        }, null);
        return future;
    }

    @Override
    public CompletableFuture<Void> skipMessages(final int numMessagesToSkip) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Skipping {} messages, current backlog {}", new Object[]{this.topicName, this.subName, numMessagesToSkip, this.cursor.getNumberOfEntriesInBacklog(false)});
        }
        this.cursor.asyncSkipEntries(numMessagesToSkip, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback(){

            public void skipEntriesComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Skipped {} messages, new backlog {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, numMessagesToSkip, PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog(false)});
                }
                future.complete(null);
            }

            public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{}] Failed to skip {} messages", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, numMessagesToSkip, exception});
                future.completeExceptionally(exception);
            }
        }, null);
        return future;
    }

    @Override
    public CompletableFuture<Void> resetCursor(final long timestamp) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(this.topicName, this.cursor);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Resetting subscription to timestamp {}", new Object[]{this.topicName, this.subName, timestamp});
        }
        persistentMessageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback(){

            public void findEntryComplete(Position position, Object ctx) {
                Position finalPosition;
                if (position == null) {
                    finalPosition = PersistentSubscription.this.cursor.getFirstPosition();
                    if (finalPosition == null) {
                        log.warn("[{}][{}] Unable to find position for timestamp {}. Unable to reset cursor to first position", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, timestamp});
                        future.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition("Unable to find position for specified timestamp"));
                        return;
                    }
                    log.info("[{}][{}] Unable to find position for timestamp {}. Resetting cursor to first position {} in ledger", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, timestamp, finalPosition});
                } else {
                    finalPosition = position.getNext();
                }
                PersistentSubscription.this.resetCursor(finalPosition, future);
            }

            public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
                if (exception instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                    future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(exception.getMessage()));
                } else {
                    future.completeExceptionally(new BrokerServiceException(exception));
                }
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> resetCursor(Position position) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.resetCursor(position, future);
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetCursor(final Position finalPosition, final CompletableFuture<Void> future) {
        CompletableFuture<Object> disconnectFuture;
        if (!IS_FENCED_UPDATER.compareAndSet(this, 0, 1)) {
            future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to fence subscription"));
            return;
        }
        PersistentSubscription persistentSubscription = this;
        synchronized (persistentSubscription) {
            disconnectFuture = this.dispatcher != null && this.dispatcher.isConsumerConnected() ? this.dispatcher.disconnectActiveConsumers(true) : CompletableFuture.completedFuture(null);
        }
        disconnectFuture.whenComplete((aVoid, throwable) -> {
            if (this.dispatcher != null) {
                this.dispatcher.resetCloseFuture();
            }
            if (throwable != null) {
                log.error("[{}][{}] Failed to disconnect consumer from subscription", new Object[]{this.topicName, this.subName, throwable});
                IS_FENCED_UPDATER.set(this, 0);
                future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to disconnect consumers from subscription"));
                return;
            }
            log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset", (Object)this.topicName, (Object)this.subName);
            try {
                this.cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback(){

                    public void resetComplete(Object ctx) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{}] Successfully reset subscription to position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, finalPosition});
                        }
                        if (PersistentSubscription.this.dispatcher != null) {
                            PersistentSubscription.this.dispatcher.cursorIsReset();
                        }
                        IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        future.complete(null);
                    }

                    public void resetFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("[{}][{}] Failed to reset subscription to position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, finalPosition, exception});
                        IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        if (exception instanceof ManagedLedgerException.InvalidCursorPositionException) {
                            future.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition(exception.getMessage()));
                        } else if (exception instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                            future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(exception.getMessage()));
                        } else {
                            future.completeExceptionally(new BrokerServiceException(exception));
                        }
                    }
                });
            }
            catch (Exception e) {
                log.error("[{}][{}] Error while resetting cursor", new Object[]{this.topicName, this.subName, e});
                IS_FENCED_UPDATER.set(this, 0);
                future.completeExceptionally(new BrokerServiceException(e));
            }
        });
    }

    @Override
    public CompletableFuture<Entry> peekNthMessage(int messagePosition) {
        final CompletableFuture<Entry> future = new CompletableFuture<Entry>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Getting message at position {}", new Object[]{this.topicName, this.subName, messagePosition});
        }
        this.cursor.asyncGetNthEntry(messagePosition, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }

            public void readEntryComplete(Entry entry, Object ctx) {
                future.complete(entry);
            }
        }, null);
        return future;
    }

    @Override
    public long getNumberOfEntriesInBacklog(boolean getPreciseBacklog) {
        return this.cursor.getNumberOfEntriesInBacklog(getPreciseBacklog);
    }

    @Override
    public synchronized Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    public long getNumberOfEntriesSinceFirstNotAckedMessage() {
        return this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
    }

    public int getTotalNonContiguousDeletedMessagesRange() {
        return this.cursor.getTotalNonContiguousDeletedMessagesRange();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> close() {
        PersistentSubscription persistentSubscription = this;
        synchronized (persistentSubscription) {
            if (this.dispatcher != null && this.dispatcher.isConsumerConnected()) {
                return FutureUtil.failedFuture((Throwable)new BrokerServiceException.SubscriptionBusyException("Subscription has active consumers"));
            }
            return this.pendingAckHandle.close().thenAccept(v -> {
                IS_FENCED_UPDATER.set(this, 1);
                log.info("[{}][{}] Successfully closed subscription [{}]", new Object[]{this.topicName, this.subName, this.cursor});
            });
        }
    }

    @Override
    public synchronized CompletableFuture<Void> disconnect() {
        CompletableFuture<Void> disconnectFuture = new CompletableFuture<Void>();
        IS_FENCED_UPDATER.set(this, 1);
        ((CompletableFuture)((CompletableFuture)(this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenCompose(v -> this.close())).thenRun(() -> {
            log.info("[{}][{}] Successfully disconnected and closed subscription", (Object)this.topicName, (Object)this.subName);
            disconnectFuture.complete(null);
        })).exceptionally(exception -> {
            IS_FENCED_UPDATER.set(this, 0);
            if (this.dispatcher != null) {
                this.dispatcher.reset();
            }
            log.error("[{}][{}] Error disconnecting consumers from subscription", new Object[]{this.topicName, this.subName, exception});
            disconnectFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return disconnectFuture;
    }

    @Override
    public CompletableFuture<Void> delete() {
        return this.delete(false);
    }

    @Override
    public CompletableFuture<Void> deleteForcefully() {
        return this.delete(true);
    }

    private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
        CompletableFuture<Void> deleteFuture = new CompletableFuture<Void>();
        log.info("[{}][{}] Unsubscribing", (Object)this.topicName, (Object)this.subName);
        CompletableFuture closeSubscriptionFuture = new CompletableFuture();
        if (closeIfConsumersConnected) {
            ((CompletableFuture)this.disconnect().thenRun(() -> closeSubscriptionFuture.complete(null))).exceptionally(ex -> {
                log.error("[{}][{}] Error disconnecting and closing subscription", new Object[]{this.topicName, this.subName, ex});
                closeSubscriptionFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        } else {
            ((CompletableFuture)this.close().thenRun(() -> closeSubscriptionFuture.complete(null))).exceptionally(exception -> {
                log.error("[{}][{}] Error closing subscription", new Object[]{this.topicName, this.subName, exception});
                closeSubscriptionFuture.completeExceptionally((Throwable)exception);
                return null;
            });
        }
        ((CompletableFuture)((CompletableFuture)closeSubscriptionFuture.thenCompose(v -> this.topic.unsubscribe(this.subName))).thenAccept(v -> {
            PersistentSubscription persistentSubscription = this;
            synchronized (persistentSubscription) {
                ((CompletableFuture)(this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
                    log.info("[{}][{}] Successfully deleted subscription", (Object)this.topicName, (Object)this.subName);
                    deleteFuture.complete(null);
                })).exceptionally(ex -> {
                    IS_FENCED_UPDATER.set(this, 0);
                    if (this.dispatcher != null) {
                        this.dispatcher.reset();
                    }
                    log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, ex});
                    deleteFuture.completeExceptionally((Throwable)ex);
                    return null;
                });
            }
        })).exceptionally(exception -> {
            IS_FENCED_UPDATER.set(this, 0);
            log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, exception});
            deleteFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return deleteFuture;
    }

    @Override
    public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            if (this.dispatcher.canUnsubscribe(consumer)) {
                consumer.close();
                return this.delete();
            }
            future.completeExceptionally(new BrokerServiceException.ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe"));
        }
        catch (BrokerServiceException e) {
            log.warn("Error removing consumer {}", (Object)consumer);
            future.completeExceptionally(e);
        }
        return future;
    }

    @Override
    public List<Consumer> getConsumers() {
        Dispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            return dispatcher.getConsumers();
        }
        return Collections.emptyList();
    }

    @Override
    public boolean expireMessages(int messageTTLInSeconds) {
        if (this.getNumberOfEntriesInBacklog(false) == 0L || this.dispatcher != null && this.dispatcher.isConsumerConnected() && this.getNumberOfEntriesInBacklog(false) < 1000L && !this.topic.isOldestMessageExpired(this.cursor, messageTTLInSeconds)) {
            return false;
        }
        this.lastExpireTimestamp = System.currentTimeMillis();
        return this.expiryMonitor.expireMessages(messageTTLInSeconds);
    }

    @Override
    public boolean expireMessages(Position position) {
        this.lastExpireTimestamp = System.currentTimeMillis();
        return this.expiryMonitor.expireMessages(position);
    }

    @Override
    public double getExpiredMessageRate() {
        return this.expiryMonitor.getMessageExpiryRate();
    }

    public PersistentMessageExpiryMonitor getExpiryMonitor() {
        return this.expiryMonitor;
    }

    public long estimateBacklogSize() {
        return this.cursor.getEstimatedSizeSinceMarkDeletePosition();
    }

    public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
        LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;
        Consumer activeConsumer;
        SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
        subStats.lastExpireTimestamp = this.lastExpireTimestamp;
        subStats.lastConsumedFlowTimestamp = this.lastConsumedFlowTimestamp;
        subStats.lastMarkDeleteAdvancedTimestamp = this.lastMarkDeleteAdvancedTimestamp;
        subStats.bytesOutCounter = this.bytesOutFromRemovedConsumers.longValue();
        subStats.msgOutCounter = this.msgOutFromRemovedConsumer.longValue();
        Dispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            Map<String, List<String>> consumerKeyHashRanges = this.getType() == CommandSubscribe.SubType.Key_Shared ? ((PersistentStickyKeyDispatcherMultipleConsumers)dispatcher).getConsumerKeyHashRanges() : null;
            dispatcher.getConsumers().forEach(consumer -> {
                ConsumerStatsImpl consumerStats = consumer.getStats();
                subStats.consumers.add(consumerStats);
                subStats.msgRateOut += consumerStats.msgRateOut;
                subStats.msgThroughputOut += consumerStats.msgThroughputOut;
                subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                subStats.msgOutCounter += consumerStats.msgOutCounter;
                subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
                subStats.chunkedMessageRate = (int)((double)subStats.chunkedMessageRate + consumerStats.chunkedMessageRate);
                subStats.unackedMessages += (long)consumerStats.unackedMessages;
                subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
                subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
                if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer.consumerName())) {
                    consumerStats.keyHashRanges = (List)consumerKeyHashRanges.get(consumer.consumerName());
                }
            });
        }
        CommandSubscribe.SubType subType = this.getType();
        subStats.type = this.getTypeString();
        if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer && (activeConsumer = ((PersistentDispatcherSingleActiveConsumer)dispatcher).getActiveConsumer()) != null) {
            subStats.activeConsumerName = activeConsumer.consumerName();
        }
        if (Subscription.isIndividualAckMode(subType) && dispatcher instanceof PersistentDispatcherMultipleConsumers) {
            PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers)dispatcher;
            subStats.unackedMessages = d.getTotalUnackedMessages();
            subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs();
            subStats.msgDelayed = d.getNumberOfDelayedMessages();
        }
        subStats.msgBacklog = this.getNumberOfEntriesInBacklog(getPreciseBacklog);
        if (subscriptionBacklogSize) {
            subStats.backlogSize = ((ManagedLedgerImpl)this.topic.getManagedLedger()).getEstimatedBacklogSize((PositionImpl)this.cursor.getMarkDeletedPosition());
        }
        subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
        subStats.msgRateExpired = this.expiryMonitor.getMessageExpiryRate();
        subStats.totalMsgExpired = this.expiryMonitor.getTotalMessageExpired();
        subStats.isReplicated = this.isReplicated();
        subStats.isDurable = this.cursor.isDurable();
        if (this.getType() == CommandSubscribe.SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers && (recentlyJoinedConsumers = ((PersistentStickyKeyDispatcherMultipleConsumers)dispatcher).getRecentlyJoinedConsumers()) != null && recentlyJoinedConsumers.size() > 0) {
            recentlyJoinedConsumers.forEach((k, v) -> subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString()));
        }
        subStats.nonContiguousDeletedMessagesRanges = this.cursor.getTotalNonContiguousDeletedMessagesRange();
        subStats.nonContiguousDeletedMessagesRangesSerializedSize = this.cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
        return subStats;
    }

    @Override
    public void redeliverUnacknowledgedMessages(Consumer consumer) {
        Dispatcher dispatcher = this.getDispatcher();
        if (dispatcher != null) {
            dispatcher.redeliverUnacknowledgedMessages(consumer);
        }
    }

    @Override
    public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
        Dispatcher dispatcher = this.getDispatcher();
        if (dispatcher != null) {
            dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
        }
    }

    private void trimByMarkDeletePosition(List<PositionImpl> positions) {
        positions.removeIf(position -> this.cursor.getMarkDeletedPosition() != null && position.compareTo((PositionImpl)this.cursor.getMarkDeletedPosition()) <= 0);
    }

    @Override
    public void addUnAckedMessages(int unAckMessages) {
        this.dispatcher.addUnAckedMessages(unAckMessages);
    }

    @Override
    public synchronized long getNumberOfEntriesDelayed() {
        if (this.dispatcher != null) {
            return this.dispatcher.getNumberOfDelayedMessages();
        }
        return 0L;
    }

    @Override
    public void markTopicWithBatchMessagePublished() {
        this.topic.markBatchMessagePublished();
    }

    void topicTerminated() {
        if (this.cursor.getNumberOfEntriesInBacklog(false) == 0L && null != this.dispatcher) {
            this.dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
        }
    }

    protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
        Map<String, Long> baseProperties;
        Map<String, Long> map = baseProperties = this.isReplicated() ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
        if (userProperties.isEmpty()) {
            return baseProperties;
        }
        TreeMap<String, Long> merged = new TreeMap<String, Long>();
        merged.putAll(userProperties);
        merged.putAll(baseProperties);
        return merged;
    }

    @Override
    public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
        ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
        if (snapshotCache != null) {
            snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
        }
    }

    @Override
    public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction, long lowWaterMark) {
        TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
        if (TxnAction.COMMIT.getValue() == txnAction) {
            return this.pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), lowWaterMark);
        }
        if (TxnAction.ABORT.getValue() == txnAction) {
            Consumer redeliverConsumer = null;
            if (this.getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
                redeliverConsumer = ((PersistentDispatcherSingleActiveConsumer)this.getDispatcher()).getActiveConsumer();
            }
            return this.pendingAckHandle.abortTxn(txnID, redeliverConsumer, lowWaterMark);
        }
        return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Unsupported txnAction " + txnAction));
    }

    @VisibleForTesting
    public ManagedCursor getCursor() {
        return this.cursor;
    }

    public void syncBatchPositionBitSetForPendingAck(PositionImpl position) {
        this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position);
    }

    public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) {
        return this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position);
    }

    public TransactionPendingAckStats getTransactionPendingAckStats() {
        return this.pendingAckHandle.getStats();
    }

    public boolean checkAndUnblockIfStuck() {
        return this.dispatcher != null ? this.dispatcher.checkAndUnblockIfStuck() : false;
    }

    public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
        return this.pendingAckHandle.getTransactionInPendingAckStats(txnID);
    }

    public CompletableFuture<ManagedLedger> getPendingAckManageLedger() {
        if (this.pendingAckHandle instanceof PendingAckHandleImpl) {
            return ((PendingAckHandleImpl)this.pendingAckHandle).getStoreManageLedger();
        }
        return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Pending ack handle don't use managedLedger!"));
    }

    static {
        REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
        log = LoggerFactory.getLogger(PersistentSubscription.class);
    }
}

