/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.Recycler;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentReplicator
extends AbstractReplicator
implements Replicator,
AsyncCallbacks.ReadEntriesCallback,
AsyncCallbacks.DeleteCallback {
    private final PersistentTopic topic;
    protected final ManagedCursor cursor;
    private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
    private int readBatchSize;
    private final int readMaxSizeBytes;
    private final int producerQueueThreshold;
    private static final AtomicIntegerFieldUpdater<PersistentReplicator> PENDING_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentReplicator.class, "pendingMessages");
    private volatile int pendingMessages = 0;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private static final AtomicIntegerFieldUpdater<PersistentReplicator> HAVE_PENDING_READ_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentReplicator.class, "havePendingRead");
    private volatile int havePendingRead = 0;
    private final Rate msgOut = new Rate();
    private final Rate msgExpired = new Rate();
    private int messageTTLInSeconds = 0;
    private final Backoff readFailureBackoff = new Backoff(1L, TimeUnit.SECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
    private final PersistentMessageExpiryMonitor expiryMonitor;
    private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
    private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
    private volatile boolean fetchSchemaInProgress = false;
    private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);

    public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, replicationClient);
        this.topic = topic;
        this.cursor = cursor;
        this.expiryMonitor = new PersistentMessageExpiryMonitor(this.topicName, Codec.decode(cursor.getName()), cursor, null);
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        PENDING_MESSAGES_UPDATER.set(this, 0);
        this.readBatchSize = Math.min(this.producerQueueSize, topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
        this.readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes();
        this.producerQueueThreshold = (int)((double)this.producerQueueSize * 0.9);
        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
        this.startProducer();
    }

    @Override
    protected void readEntries(Producer<byte[]> producer) {
        this.cursor.rewind();
        this.cursor.cancelPendingReadRequest();
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        this.producer = (ProducerImpl)producer;
        if (STATE_UPDATER.compareAndSet(this, AbstractReplicator.State.Starting, AbstractReplicator.State.Started)) {
            log.info("[{}][{} -> {}] Created replicator producer", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            this.backOff.reset();
            this.cursor.setActive();
            this.readMoreEntries();
        } else {
            log.info("[{}][{} -> {}] Replicator was stopped while creating the producer. Closing it. Replicator state: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, STATE_UPDATER.get(this)});
            STATE_UPDATER.set(this, AbstractReplicator.State.Stopping);
            this.closeProducerAsync();
        }
    }

    @Override
    protected Position getReplicatorReadPosition() {
        return this.cursor.getMarkDeletedPosition();
    }

    @Override
    protected long getNumberOfEntriesInBacklog() {
        return this.cursor.getNumberOfEntriesInBacklog(false);
    }

    @Override
    protected void disableReplicatorRead() {
        if (this.cursor != null) {
            this.cursor.setInactive();
        }
    }

    private int getAvailablePermits() {
        int availablePermits = this.producerQueueSize - PENDING_MESSAGES_UPDATER.get(this);
        if (availablePermits <= 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Producer queue is full, availablePermits: {}, pause reading", new Object[]{this.topicName, this.localCluster, this.remoteCluster, availablePermits});
            }
            return 0;
        }
        if (this.dispatchRateLimiter.isPresent() && this.dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
            DispatchRateLimiter rateLimiter = this.dispatchRateLimiter.get();
            if (!rateLimiter.hasMessageDispatchPermit()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] message-read exceeded topic replicator message-rate {}/{}, schedule after a {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(), 1000});
                }
                return -1;
            }
            long availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg();
            if (availablePermitsOnMsg > 0L) {
                availablePermits = Math.min(availablePermits, (int)availablePermitsOnMsg);
            }
        }
        return availablePermits;
    }

    protected void readMoreEntries() {
        if (this.fetchSchemaInProgress) {
            log.info("[{}][{} -> {}] Skip the reading due to new detected schema", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            return;
        }
        int availablePermits = this.getAvailablePermits();
        if (availablePermits > 0) {
            int messagesToRead = Math.min(availablePermits, this.readBatchSize);
            if (!this.isWritable()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Throttling replication traffic because producer is not writable", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
                }
                messagesToRead = 1;
            }
            messagesToRead = Math.max(messagesToRead, 1);
            if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, 0, 1)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Schedule read of {} messages", new Object[]{this.topicName, this.localCluster, this.remoteCluster, messagesToRead});
                }
                this.cursor.asyncReadEntriesOrWait(messagesToRead, this.readMaxSizeBytes, this, null, PositionImpl.latest);
            } else if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Not scheduling read due to pending read. Messages To Read {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, messagesToRead});
            }
        } else if (availablePermits == -1) {
            this.topic.getBrokerService().executor().schedule(() -> this.readMoreEntries(), 1000L, TimeUnit.MILLISECONDS);
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] No Permits for reading. availablePermits: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, availablePermits});
        }
    }

    @Override
    public void readEntriesComplete(List<Entry> entries, Object ctx) {
        int maxReadBatchSize;
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Read entries complete of {} messages", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entries.size()});
        }
        if (this.readBatchSize < (maxReadBatchSize = this.topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize())) {
            int newReadBatchSize = Math.min(this.readBatchSize * 2, maxReadBatchSize);
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, this.readBatchSize, newReadBatchSize});
            }
            this.readBatchSize = newReadBatchSize;
        }
        this.readFailureBackoff.reduceToHalf();
        boolean atLeastOneMessageSentForReplication = false;
        boolean isEnableReplicatedSubscriptions = this.brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
        try {
            boolean isLocalMessageSkippedOnce = false;
            boolean skipRemainingMessages = false;
            for (int i = 0; i < entries.size(); ++i) {
                MessageImpl<byte[]> msg;
                Entry entry = entries.get(i);
                if (skipRemainingMessages) {
                    entry.release();
                    continue;
                }
                int length = entry.getLength();
                ByteBuf headersAndPayload = entry.getDataBuffer();
                try {
                    msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
                }
                catch (Throwable t) {
                    log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), length, t.getMessage(), t});
                    this.cursor.asyncDelete(entry.getPosition(), (AsyncCallbacks.DeleteCallback)this, (Object)entry.getPosition());
                    entry.release();
                    continue;
                }
                if (isEnableReplicatedSubscriptions) {
                    this.checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
                }
                if (msg.isReplicated()) {
                    this.cursor.asyncDelete(entry.getPosition(), (AsyncCallbacks.DeleteCallback)this, (Object)entry.getPosition());
                    entry.release();
                    msg.recycle();
                    continue;
                }
                if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(this.remoteCluster)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}][{} -> {}] Skipping message at position {}, replicateTo {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), msg.getReplicateTo()});
                    }
                    this.cursor.asyncDelete(entry.getPosition(), (AsyncCallbacks.DeleteCallback)this, (Object)entry.getPosition());
                    entry.release();
                    msg.recycle();
                    continue;
                }
                if (msg.isExpired(this.messageTTLInSeconds)) {
                    this.msgExpired.recordEvent(0L);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}][{} -> {}] Discarding expired message at position {}, replicateTo {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), msg.getReplicateTo()});
                    }
                    this.cursor.asyncDelete(entry.getPosition(), (AsyncCallbacks.DeleteCallback)this, (Object)entry.getPosition());
                    entry.release();
                    msg.recycle();
                    continue;
                }
                if (STATE_UPDATER.get(this) != AbstractReplicator.State.Started || isLocalMessageSkippedOnce) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}][{} -> {}] Dropping read message at {} because producer is not ready", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition()});
                    }
                    isLocalMessageSkippedOnce = true;
                    entry.release();
                    msg.recycle();
                    continue;
                }
                this.dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1L, entry.getLength()));
                PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                this.msgOut.recordEvent(headersAndPayload.readableBytes());
                msg.setReplicatedFrom(this.localCluster);
                headersAndPayload.retain();
                CompletableFuture<SchemaInfo> schemaFuture = this.getSchemaInfo(msg);
                if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
                    entry.release();
                    headersAndPayload.release();
                    msg.recycle();
                    this.fetchSchemaInProgress = true;
                    skipRemainingMessages = true;
                    this.cursor.cancelPendingReadRequest();
                    log.info("[{}][{} -> {}] Pause the data replication due to new detected schema", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
                    schemaFuture.whenComplete((__, e) -> {
                        if (e != null) {
                            log.warn("[{}][{} -> {}] Failed to get schema from local cluster, will try in the next loop", new Object[]{this.topicName, this.localCluster, this.remoteCluster, e});
                        }
                        log.info("[{}][{} -> {}] Resume the data replication after the schema fetching done", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
                        this.cursor.rewind();
                        this.fetchSchemaInProgress = false;
                        this.readMoreEntries();
                    });
                    continue;
                }
                msg.setSchemaInfoForReplicator(schemaFuture.get());
                this.producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
                atLeastOneMessageSentForReplication = true;
            }
        }
        catch (Exception e2) {
            log.error("[{}][{} -> {}] Unexpected exception: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, e2.getMessage(), e2});
        }
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        if (atLeastOneMessageSentForReplication && !this.isWritable()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Pausing replication traffic. at-least-one: {} is-writable: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, atLeastOneMessageSentForReplication, this.isWritable()});
            }
        } else {
            this.readMoreEntries();
        }
    }

    private CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws ExecutionException {
        if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) {
            return CompletableFuture.completedFuture(null);
        }
        return this.client.getSchemaProviderLoadingCache().get(this.topicName).getSchemaByVersion(msg.getSchemaVersion());
    }

    public void updateCursorState() {
        if (this.cursor != null) {
            if (this.producer != null && this.producer.isConnected()) {
                this.cursor.setActive();
            } else {
                this.cursor.setInactive();
            }
        }
    }

    @Override
    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
        if (STATE_UPDATER.get(this) != AbstractReplicator.State.Started) {
            log.info("[{}][{} -> {}] Replicator was stopped while reading entries. Stop reading. Replicator state: {}", new Object[]{this.topic, this.localCluster, this.remoteCluster, STATE_UPDATER.get(this)});
            return;
        }
        this.readBatchSize = this.topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize();
        long waitTimeMillis = this.readFailureBackoff.next();
        if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
            log.error("[{}][{} -> {}] Error reading entries because replicator is already deleted and cursor is already closed {}, ({})", new Object[]{this.topic, this.localCluster, this.remoteCluster, ctx, exception.getMessage(), exception});
            this.closeProducerAsync();
            return;
        }
        if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}][{} -> {}] Error reading entries at {}. Retrying to read in {}s. ({})", new Object[]{this.topic, this.localCluster, this.remoteCluster, ctx, (double)waitTimeMillis / 1000.0, exception.getMessage(), exception});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})", new Object[]{this.topicName, this.localCluster, this.remoteCluster, ctx, (double)waitTimeMillis / 1000.0, exception.getMessage(), exception});
        }
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        this.brokerService.executor().schedule(this::readMoreEntries, waitTimeMillis, TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<Void> clearBacklog() {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Backlog size before clearing: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, this.cursor.getNumberOfEntriesInBacklog(false)});
        }
        this.cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback(){

            @Override
            public void clearBacklogComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Backlog size after clearing: {}", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, PersistentReplicator.this.cursor.getNumberOfEntriesInBacklog(false)});
                }
                future.complete(null);
            }

            @Override
            public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{} -> {}] Failed to clear backlog", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, exception});
                future.completeExceptionally(exception);
            }
        }, null);
        return future;
    }

    public CompletableFuture<Void> skipMessages(final int numMessagesToSkip) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Skipping {} messages, current backlog {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, numMessagesToSkip, this.cursor.getNumberOfEntriesInBacklog(false)});
        }
        this.cursor.asyncSkipEntries(numMessagesToSkip, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback(){

            @Override
            public void skipEntriesComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Skipped {} messages, new backlog {}", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, numMessagesToSkip, PersistentReplicator.this.cursor.getNumberOfEntriesInBacklog(false)});
                }
                future.complete(null);
            }

            @Override
            public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{} -> {}] Failed to skip {} messages", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, numMessagesToSkip, exception});
                future.completeExceptionally(exception);
            }
        }, null);
        return future;
    }

    public CompletableFuture<Entry> peekNthMessage(int messagePosition) {
        final CompletableFuture<Entry> future = new CompletableFuture<Entry>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Getting message at position {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, messagePosition});
        }
        this.cursor.asyncGetNthEntry(messagePosition, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback(){

            @Override
            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }

            @Override
            public void readEntryComplete(Entry entry, Object ctx) {
                future.complete(entry);
            }
        }, null);
        return future;
    }

    @Override
    public void deleteComplete(Object ctx) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Deleted message at {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, ctx});
        }
    }

    @Override
    public void deleteFailed(ManagedLedgerException exception, Object ctx) {
        log.error("[{}][{} -> {}] Failed to delete message at {}: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, ctx, exception.getMessage(), exception});
    }

    @Override
    public void updateRates() {
        this.msgOut.calculateRate();
        this.msgExpired.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateExpired = this.msgExpired.getRate() + this.expiryMonitor.getMessageExpiryRate();
    }

    @Override
    public ReplicatorStatsImpl getStats() {
        this.stats.replicationBacklog = this.cursor != null ? this.cursor.getNumberOfEntriesInBacklog(false) : 0L;
        this.stats.connected = this.producer != null && this.producer.isConnected();
        this.stats.replicationDelayInSeconds = this.getReplicationDelayInSeconds();
        ProducerImpl producer = this.producer;
        if (producer != null) {
            this.stats.outboundConnection = producer.getConnectionId();
            this.stats.outboundConnectedSince = producer.getConnectedSince();
        } else {
            this.stats.outboundConnection = null;
            this.stats.outboundConnectedSince = null;
        }
        return this.stats;
    }

    public void updateMessageTTL(int messageTTLInSeconds) {
        this.messageTTLInSeconds = messageTTLInSeconds;
    }

    private long getReplicationDelayInSeconds() {
        if (this.producer != null) {
            return TimeUnit.MILLISECONDS.toSeconds(this.producer.getDelayInMillis());
        }
        return 0L;
    }

    public boolean expireMessages(int messageTTLInSeconds) {
        if (this.cursor.getNumberOfEntriesInBacklog(false) == 0L || this.cursor.getNumberOfEntriesInBacklog(false) < 1000L && !this.topic.isOldestMessageExpired(this.cursor, messageTTLInSeconds)) {
            return false;
        }
        return this.expiryMonitor.expireMessages(messageTTLInSeconds);
    }

    public boolean expireMessages(Position position) {
        return this.expiryMonitor.expireMessages(position);
    }

    @Override
    public Optional<DispatchRateLimiter> getRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override
    public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
        if (!this.dispatchRateLimiter.isPresent() && DispatchRateLimiter.isDispatchRateNeeded(this.topic.getBrokerService(), policies, this.topic.getName(), DispatchRateLimiter.Type.REPLICATOR)) {
            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, DispatchRateLimiter.Type.REPLICATOR));
        }
    }

    private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
        if (!msg.getMessageBuilder().hasMarkerType()) {
            return;
        }
        int markerType = msg.getMessageBuilder().getMarkerType();
        if (!msg.getMessageBuilder().hasReplicatedFrom() || !this.remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom())) {
            return;
        }
        switch (markerType) {
            case 10: 
            case 11: 
            case 13: {
                this.topic.receivedReplicatedSubscriptionMarker(position, markerType, payload);
                break;
            }
        }
    }

    @Override
    public CompletableFuture<Void> disconnect() {
        return this.disconnect(false);
    }

    @Override
    public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ((CompletableFuture)super.disconnect(failIfHasBacklog).thenRun(() -> {
            this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
            future.complete(null);
        })).exceptionally(ex -> {
            Throwable t;
            Throwable throwable = t = ex instanceof CompletionException ? ex.getCause() : ex;
            if (!(t instanceof BrokerServiceException.TopicBusyException)) {
                log.error("[{}][{} -> {}] Failed to close dispatch rate limiter: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, ex.getMessage()});
            }
            future.completeExceptionally(t);
            return null;
        });
        return future;
    }

    @Override
    public boolean isConnected() {
        ProducerImpl producer = this.producer;
        return producer != null && producer.isConnected();
    }

    private static final class ProducerSendCallback
    implements SendCallback {
        private PersistentReplicator replicator;
        private Entry entry;
        private MessageImpl msg;
        private final Recycler.Handle<ProducerSendCallback> recyclerHandle;
        private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>(){

            @Override
            protected ProducerSendCallback newObject(Recycler.Handle<ProducerSendCallback> handle) {
                return new ProducerSendCallback(handle);
            }
        };

        @Override
        public void sendComplete(Exception exception) {
            if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) {
                log.error("[{}][{} -> {}] Error producing on remote broker", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster, exception});
                this.replicator.cursor.rewind();
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Message persisted on remote broker", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster});
                }
                this.replicator.cursor.asyncDelete(this.entry.getPosition(), (AsyncCallbacks.DeleteCallback)this.replicator, (Object)this.entry.getPosition());
            }
            this.entry.release();
            int pending = PENDING_MESSAGES_UPDATER.decrementAndGet(this.replicator);
            if (pending < this.replicator.producerQueueThreshold && HAVE_PENDING_READ_UPDATER.get(this.replicator) == 0) {
                if (pending == 0 || this.replicator.producer.isWritable()) {
                    this.replicator.readMoreEntries();
                } else if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Not resuming reads. pending: {} is-writable: {}", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster, pending, this.replicator.producer.isWritable()});
                }
            }
            this.recycle();
        }

        private ProducerSendCallback(Recycler.Handle<ProducerSendCallback> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        static ProducerSendCallback create(PersistentReplicator replicator, Entry entry, MessageImpl msg) {
            ProducerSendCallback sendCallback = RECYCLER.get();
            sendCallback.replicator = replicator;
            sendCallback.entry = entry;
            sendCallback.msg = msg;
            return sendCallback;
        }

        private void recycle() {
            this.replicator = null;
            this.entry = null;
            if (this.msg != null) {
                this.msg.recycle();
                this.msg = null;
            }
            this.recyclerHandle.recycle(this);
        }

        @Override
        public void addCallback(MessageImpl<?> msg, SendCallback scb) {
        }

        @Override
        public SendCallback getNextSendCallback() {
            return null;
        }

        @Override
        public MessageImpl<?> getNextMessage() {
            return null;
        }

        @Override
        public CompletableFuture<MessageId> getFuture() {
            return CompletableFuture.completedFuture(null);
        }
    }
}

