/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.Closeable;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.Timer;
import org.apache.pulsar.shade.it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import org.apache.pulsar.shade.it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import org.apache.pulsar.shade.it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import org.apache.pulsar.shade.it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import org.apache.pulsar.shade.it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import org.apache.pulsar.shade.org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);
    private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = null;
    private final ConsumerBase<?> consumer;
    private final Timer timer;
    private final long nackDelayMs;
    private final RedeliveryBackoff negativeAckRedeliveryBackoff;
    private final int negativeAckPrecisionBitCnt;
    private Timeout timeout;
    private static final long MIN_NACK_DELAY_MS = 100L;
    private static final int DUMMY_PARTITION_INDEX = -2;

    public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
        this.consumer = consumer;
        this.timer = consumer.getClient().timer();
        this.nackDelayMs = Math.max(TimeUnit.MICROSECONDS.toMillis(conf.getNegativeAckRedeliveryDelayMicros()), 100L);
        this.negativeAckRedeliveryBackoff = conf.getNegativeAckRedeliveryBackoff();
        this.negativeAckPrecisionBitCnt = conf.getNegativeAckPrecisionBitCnt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void triggerRedelivery(Timeout t) {
        HashSet<MessageId> messagesToRedeliver = new HashSet<MessageId>();
        NegativeAcksTracker negativeAcksTracker = this;
        synchronized (negativeAcksTracker) {
            long nextTriggerTimestamp;
            long delayMs;
            long timestamp;
            if (this.nackedMessages.isEmpty()) {
                this.timeout = null;
                return;
            }
            long currentTimestamp = System.currentTimeMillis();
            LongBidirectionalIterator longBidirectionalIterator = this.nackedMessages.keySet().iterator();
            while (longBidirectionalIterator.hasNext() && (timestamp = ((Long)longBidirectionalIterator.next()).longValue()) <= currentTimestamp) {
                Long2ObjectMap ledgerMap = (Long2ObjectMap)this.nackedMessages.get(timestamp);
                for (Long2ObjectMap.Entry entry : ledgerMap.long2ObjectEntrySet()) {
                    long ledgerId = entry.getLongKey();
                    Roaring64Bitmap entrySet = (Roaring64Bitmap)entry.getValue();
                    entrySet.forEach(entryId -> {
                        MessageIdImpl msgId = new MessageIdImpl(ledgerId, entryId, -2);
                        UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
                        messagesToRedeliver.add(msgId);
                    });
                }
            }
            LongBidirectionalIterator iterator = this.nackedMessages.keySet().iterator();
            while (iterator.hasNext() && (timestamp = iterator.nextLong()) <= currentTimestamp) {
                iterator.remove();
            }
            this.timeout = !this.nackedMessages.isEmpty() ? ((delayMs = Math.max((nextTriggerTimestamp = this.nackedMessages.firstLongKey()) - currentTimestamp, 0L)) > 0L ? this.timer.newTimeout(this::triggerRedelivery, delayMs, TimeUnit.MILLISECONDS) : this.timer.newTimeout(this::triggerRedelivery, 0L, TimeUnit.MILLISECONDS)) : null;
        }
        if (!messagesToRedeliver.isEmpty()) {
            this.consumer.onNegativeAcksSend(messagesToRedeliver);
            log.info("[{}] {} messages will be re-delivered", this.consumer, (Object)messagesToRedeliver.size());
            this.consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
        }
    }

    public synchronized void add(MessageId messageId) {
        this.add(messageId, 0);
    }

    public synchronized void add(Message<?> message) {
        this.add(message.getMessageId(), message.getRedeliveryCount());
    }

    static long trimLowerBit(long timestamp, int bits) {
        return timestamp & -1L << bits;
    }

    private synchronized void add(MessageId messageId, int redeliveryCount) {
        if (this.nackedMessages == null) {
            this.nackedMessages = new Long2ObjectAVLTreeMap<Long2ObjectMap<Roaring64Bitmap>>();
        }
        long backoffMs = this.negativeAckRedeliveryBackoff != null ? TimeUnit.MILLISECONDS.toMillis(this.negativeAckRedeliveryBackoff.next(redeliveryCount)) : this.nackDelayMs;
        MessageIdAdv messageIdAdv = (MessageIdAdv)messageId;
        long timestamp = NegativeAcksTracker.trimLowerBit(System.currentTimeMillis() + backoffMs, this.negativeAckPrecisionBitCnt);
        this.nackedMessages.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap()).computeIfAbsent(messageIdAdv.getLedgerId(), k -> new Roaring64Bitmap()).add(messageIdAdv.getEntryId());
        if (this.timeout == null) {
            this.timeout = this.timer.newTimeout(this::triggerRedelivery, backoffMs, TimeUnit.MILLISECONDS);
        }
    }

    public static MessageIdAdv discardBatchAndPartitionIndex(MessageId messageId) {
        if (messageId instanceof ChunkMessageIdImpl) {
            return (MessageIdAdv)messageId;
        }
        MessageIdAdv msgId = (MessageIdAdv)messageId;
        return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), -2);
    }

    @VisibleForTesting
    synchronized long getNackedMessagesCount() {
        if (this.nackedMessages == null) {
            return 0L;
        }
        return this.nackedMessages.values().stream().mapToLong(ledgerMap -> ledgerMap.values().stream().mapToLong(Roaring64Bitmap::getLongCardinality).sum()).sum();
    }

    @Override
    public synchronized void close() {
        if (this.timeout != null && !this.timeout.isCancelled()) {
            this.timeout.cancel();
            this.timeout = null;
        }
        if (this.nackedMessages != null) {
            this.nackedMessages.clear();
            this.nackedMessages = null;
        }
    }
}

