/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.pbcast;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.protocols.TCP;
import org.jgroups.protocols.TP;
import org.jgroups.protocols.pbcast.NakAckHeader2;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.Protocol;
import org.jgroups.util.AsciiString;
import org.jgroups.util.BoundedList;
import org.jgroups.util.Digest;
import org.jgroups.util.LongTuple;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.SeqnoList;
import org.jgroups.util.SuppressLog;
import org.jgroups.util.Table;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

@MBean(description="Reliable transmission multipoint FIFO protocol")
public class NAKACK2
extends Protocol
implements DiagnosticsHandler.ProbeHandler {
    protected static final int NUM_REBROADCAST_MSGS = 3;
    @Property(description="Retransmit retransmit responses (messages) using multicast rather than unicast")
    protected boolean use_mcast_xmit = true;
    @Property(description="Use a multicast to request retransmission of missing messages")
    protected boolean use_mcast_xmit_req = false;
    @Property(description="Ask a random member for retransmission of a missing message. Default is false")
    protected boolean xmit_from_random_member = false;
    @Property(description="Should messages delivered to application be discarded")
    protected boolean discard_delivered_msgs = true;
    @Property(description="Timeout to rebroadcast messages. Default is 2000 msec")
    protected long max_rebroadcast_timeout = 2000L;
    @Property(description="discards warnings about promiscuous traffic")
    protected boolean log_discard_msgs = true;
    @Property(description="If false, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
    protected boolean log_not_found_msgs = true;
    @Property(description="Interval (in milliseconds) at which missing messages (from all retransmit buffers) are retransmitted")
    protected long xmit_interval = 1000L;
    @Property(description="Number of rows of the matrix in the retransmission table (only for experts)", writable=false)
    protected int xmit_table_num_rows = 100;
    @Property(description="Number of elements of a row of the matrix in the retransmission table; gets rounded to the next power of 2 (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row", writable=false)
    protected int xmit_table_msgs_per_row = 1024;
    @Property(description="Resize factor of the matrix in the retransmission table (only for experts)", writable=false)
    protected double xmit_table_resize_factor = 1.2;
    @Property(description="Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts)", writable=false)
    protected long xmit_table_max_compaction_time = 10000L;
    @Property(description="Size of the queue to hold messages received after creating the channel, but before being connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the queue is cleared. The motivation is to avoid retransmissions (see https://issues.jboss.org/browse/JGRP-1509 for details). 0 disables the queue.")
    protected int become_server_queue_size = 50;
    @Property(description="Time during which identical warnings about messages from a non member will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.")
    protected long suppress_time_non_member_warnings = 60000L;
    @Property(description="Max number of messages to ask for in a retransmit request. 0 disables this and uses the max bundle size in the transport")
    protected int max_xmit_req_size;
    @Property(description="If enabled, multicasts the highest sent seqno every xmit_interval ms. This is skipped if a regular message has been multicast, and the task aquiesces if the highest sent seqno hasn't changed for resend_last_seqno_max_times times. Used to speed up retransmission of dropped last messages (JGRP-1904)")
    protected boolean resend_last_seqno = true;
    @Property(description="Max number of times the last seqno is resent before acquiescing if last seqno isn't incremented")
    protected int resend_last_seqno_max_times = 1;
    @ManagedAttribute(description="True if sending a message can block at the transport level")
    protected boolean sends_can_block = true;
    @ManagedAttribute(description="Number of messages sent")
    protected int num_messages_sent;
    @ManagedAttribute(description="Number of messages received")
    protected int num_messages_received;
    protected static final Message DUMMY_OOB_MSG = new Message().setFlag(Message.Flag.OOB);
    protected final Predicate<Message> no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs = msg -> !(msg == null || msg == DUMMY_OOB_MSG || msg.isFlagSet(Message.Flag.OOB) && !msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED) || msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK) && this.local_addr != null && this.local_addr.equals(msg.getSrc()));
    protected static final Predicate<Message> dont_loopback_filter = msg -> msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
    protected static final BiConsumer<MessageBatch, Message> BATCH_ACCUMULATOR = MessageBatch::add;
    @ManagedAttribute(description="Number of retransmit requests received")
    protected final LongAdder xmit_reqs_received = new LongAdder();
    @ManagedAttribute(description="Number of retransmit requests sent")
    protected final LongAdder xmit_reqs_sent = new LongAdder();
    @ManagedAttribute(description="Number of retransmit responses received")
    protected final LongAdder xmit_rsps_received = new LongAdder();
    @ManagedAttribute(description="Number of retransmit responses sent")
    protected final LongAdder xmit_rsps_sent = new LongAdder();
    @ManagedAttribute(description="tracing is enabled or disabled for the given log", writable=true)
    protected boolean is_trace = this.log.isTraceEnabled();
    protected volatile boolean is_server;
    protected Address local_addr;
    protected volatile List<Address> members = new ArrayList<Address>();
    protected volatile View view;
    private final AtomicLong seqno = new AtomicLong(0L);
    protected final ConcurrentMap<Address, Table<Message>> xmit_table = Util.createConcurrentMap();
    protected Future<?> xmit_task;
    protected final Map<Address, Long> xmit_task_map = new ConcurrentHashMap<Address, Long>();
    protected volatile boolean leaving = false;
    protected volatile boolean running = false;
    protected TimeScheduler timer = null;
    protected LastSeqnoResender last_seqno_resender;
    protected final Lock rebroadcast_lock = new ReentrantLock();
    protected final Condition rebroadcast_done = this.rebroadcast_lock.newCondition();
    protected volatile boolean rebroadcasting = false;
    protected final Lock rebroadcast_digest_lock = new ReentrantLock();
    protected Digest rebroadcast_digest = null;
    protected final BoundedList<String> stability_msgs = new BoundedList(10);
    protected final BoundedList<String> digest_history = new BoundedList(10);
    protected BoundedList<Message> become_server_queue;
    protected SuppressLog<Address> suppress_log_non_member;

    @ManagedAttribute(description="Is the retransmit task running")
    public boolean isXmitTaskRunning() {
        return this.xmit_task != null && !this.xmit_task.isDone();
    }

    @ManagedAttribute(description="Number of messages from non-members")
    public int getNonMemberMessages() {
        return this.suppress_log_non_member != null ? this.suppress_log_non_member.getCache().size() : 0;
    }

    @ManagedOperation(description="Clears the cache for messages from non-members")
    public void clearNonMemberCache() {
        if (this.suppress_log_non_member != null) {
            this.suppress_log_non_member.getCache().clear();
        }
    }

    @ManagedAttribute
    public void setResendLastSeqno(boolean flag) {
        if (this.resend_last_seqno != flag) {
            this.resend_last_seqno = flag;
        }
        if (this.resend_last_seqno) {
            if (this.last_seqno_resender == null) {
                this.last_seqno_resender = new LastSeqnoResender();
            }
        } else if (this.last_seqno_resender != null) {
            this.last_seqno_resender = null;
        }
    }

    @ManagedAttribute(description="Whether or not the task to resend the last seqno is running (depends on resend_last_seqno)")
    public boolean resendTaskRunning() {
        return this.last_seqno_resender != null;
    }

    public long getXmitRequestsReceived() {
        return this.xmit_reqs_received.sum();
    }

    public long getXmitRequestsSent() {
        return this.xmit_reqs_sent.sum();
    }

    public long getXmitResponsesReceived() {
        return this.xmit_rsps_received.sum();
    }

    public long getXmitResponsesSent() {
        return this.xmit_rsps_sent.sum();
    }

    public boolean isUseMcastXmit() {
        return this.use_mcast_xmit;
    }

    public boolean isXmitFromRandomMember() {
        return this.xmit_from_random_member;
    }

    public boolean isDiscardDeliveredMsgs() {
        return this.discard_delivered_msgs;
    }

    public boolean getLogDiscardMessages() {
        return this.log_discard_msgs;
    }

    public NAKACK2 setUseMcastXmit(boolean use_mcast_xmit) {
        this.use_mcast_xmit = use_mcast_xmit;
        return this;
    }

    public NAKACK2 setUseMcastXmitReq(boolean flag) {
        this.use_mcast_xmit_req = flag;
        return this;
    }

    public NAKACK2 setLogDiscardMessages(boolean flag) {
        this.log_discard_msgs = flag;
        return this;
    }

    public NAKACK2 setLogNotFoundMessages(boolean flag) {
        this.log_not_found_msgs = flag;
        return this;
    }

    public NAKACK2 setXmitFromRandomMember(boolean xmit_from_random_member) {
        this.xmit_from_random_member = xmit_from_random_member;
        return this;
    }

    public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) {
        this.discard_delivered_msgs = discard_delivered_msgs;
    }

    @Override
    public <T extends Protocol> T setLevel(String level) {
        Object retval = super.setLevel(level);
        this.is_trace = this.log.isTraceEnabled();
        return retval;
    }

    @ManagedAttribute(description="Actual size of the become_server_queue")
    public int getBecomeServerQueueSizeActual() {
        return this.become_server_queue != null ? this.become_server_queue.size() : -1;
    }

    public Table<Message> getWindow(Address sender) {
        return (Table)this.xmit_table.get(sender);
    }

    public void setTimer(TimeScheduler timer) {
        this.timer = timer;
    }

    @ManagedAttribute(description="Total number of undelivered messages in all retransmit buffers")
    public int getXmitTableUndeliveredMsgs() {
        int num = 0;
        for (Table buf : this.xmit_table.values()) {
            num += buf.size();
        }
        return num;
    }

    @ManagedAttribute(description="Total number of missing (= not received) messages in all retransmit buffers")
    public int getXmitTableMissingMessages() {
        int num = 0;
        for (Table buf : this.xmit_table.values()) {
            num += buf.getNumMissing();
        }
        return num;
    }

    @ManagedAttribute(description="Capacity of the retransmit buffer. Computed as xmit_table_num_rows * xmit_table_msgs_per_row")
    public long getXmitTableCapacity() {
        Table table = this.local_addr != null ? (Table)this.xmit_table.get(this.local_addr) : null;
        return table != null ? (long)table.capacity() : 0L;
    }

    @ManagedAttribute(description="Prints the number of rows currently allocated in the matrix. This value will not be lower than xmit_table_now_rows")
    public int getXmitTableNumCurrentRows() {
        Table table = this.local_addr != null ? (Table)this.xmit_table.get(this.local_addr) : null;
        return table != null ? table.getNumRows() : 0;
    }

    @ManagedAttribute(description="Returns the number of bytes of all messages in all retransmit buffers. To compute the size, Message.getLength() is used")
    public long getSizeOfAllMessages() {
        long retval = 0L;
        for (Table buf : this.xmit_table.values()) {
            retval += NAKACK2.sizeOfAllMessages(buf, false);
        }
        return retval;
    }

    @ManagedAttribute(description="Returns the number of bytes of all messages in all retransmit buffers. To compute the size, Message.size() is used")
    public long getSizeOfAllMessagesInclHeaders() {
        long retval = 0L;
        for (Table buf : this.xmit_table.values()) {
            retval += NAKACK2.sizeOfAllMessages(buf, true);
        }
        return retval;
    }

    @ManagedAttribute(description="Number of retransmit table compactions")
    public int getXmitTableNumCompactions() {
        Table table = this.local_addr != null ? (Table)this.xmit_table.get(this.local_addr) : null;
        return table != null ? table.getNumCompactions() : 0;
    }

    @ManagedAttribute(description="Number of retransmit table moves")
    public int getXmitTableNumMoves() {
        Table table = this.local_addr != null ? (Table)this.xmit_table.get(this.local_addr) : null;
        return table != null ? table.getNumMoves() : 0;
    }

    @ManagedAttribute(description="Number of retransmit table resizes")
    public int getXmitTableNumResizes() {
        Table table = this.local_addr != null ? (Table)this.xmit_table.get(this.local_addr) : null;
        return table != null ? table.getNumResizes() : 0;
    }

    @ManagedAttribute(description="Number of retransmit table purges")
    public int getXmitTableNumPurges() {
        Table table = this.local_addr != null ? (Table)this.xmit_table.get(this.local_addr) : null;
        return table != null ? table.getNumPurges() : 0;
    }

    @ManagedOperation(description="Prints the contents of the receiver windows for all members")
    public String printMessages() {
        StringBuilder ret = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry entry : this.xmit_table.entrySet()) {
            Address addr = (Address)entry.getKey();
            Table buf = (Table)entry.getValue();
            ret.append(addr).append(": ").append(buf.toString()).append('\n');
        }
        return ret.toString();
    }

    @ManagedAttribute
    public long getCurrentSeqno() {
        return this.seqno.get();
    }

    @ManagedOperation(description="Prints the stability messages received")
    public String printStabilityMessages() {
        return Util.printListWithDelimiter(this.stability_msgs, "\n");
    }

    @ManagedOperation(description="Keeps information about the last N times a digest was set or merged")
    public String printDigestHistory() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        for (String tmp : this.digest_history) {
            sb.append(tmp).append("\n");
        }
        return sb.toString();
    }

    @ManagedOperation(description="Compacts the retransmit buffer")
    public void compact() {
        Table table;
        Table table2 = table = this.local_addr != null ? (Table)this.xmit_table.get(this.local_addr) : null;
        if (table != null) {
            table.compact();
        }
    }

    @ManagedOperation(description="Prints the number of rows currently allocated in the matrix for all members. This value will not be lower than xmit_table_now_rows")
    public String dumpXmitTablesNumCurrentRows() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry entry : this.xmit_table.entrySet()) {
            sb.append(String.format("%s: %d\n", entry.getKey(), ((Table)entry.getValue()).getNumRows()));
        }
        return sb.toString();
    }

    @Override
    @ManagedOperation(description="Resets all statistics")
    public void resetStats() {
        Table table;
        this.num_messages_received = 0;
        this.num_messages_sent = 0;
        this.xmit_reqs_received.reset();
        this.xmit_reqs_sent.reset();
        this.xmit_rsps_received.reset();
        this.xmit_rsps_sent.reset();
        this.stability_msgs.clear();
        this.digest_history.clear();
        Table table2 = table = this.local_addr != null ? (Table)this.xmit_table.get(this.local_addr) : null;
        if (table != null) {
            table.resetStats();
        }
    }

    @Override
    public void init() throws Exception {
        if (this.xmit_from_random_member && this.discard_delivered_msgs) {
            this.discard_delivered_msgs = false;
            this.log.debug("%s: xmit_from_random_member set to true: changed discard_delivered_msgs to false", this.local_addr);
        }
        TP transport = this.getTransport();
        this.sends_can_block = transport instanceof TCP;
        transport.registerProbeHandler(this);
        if (!transport.supportsMulticasting()) {
            if (this.use_mcast_xmit) {
                this.log.warn(Util.getMessage("NoMulticastTransport"), "use_mcast_xmit", transport.getName(), "use_mcast_xmit");
                this.use_mcast_xmit = false;
            }
            if (this.use_mcast_xmit_req) {
                this.log.warn(Util.getMessage("NoMulticastTransport"), "use_mcast_xmit_req", transport.getName(), "use_mcast_xmit_req");
                this.use_mcast_xmit_req = false;
            }
        }
        if (this.become_server_queue_size > 0) {
            this.become_server_queue = new BoundedList(this.become_server_queue_size);
        }
        if (this.suppress_time_non_member_warnings > 0L) {
            this.suppress_log_non_member = new SuppressLog(this.log, "MsgDroppedNak", "SuppressMsg");
        }
        int estimated_max_msgs_in_xmit_req = (transport.getMaxBundleSize() - 50) * 8;
        int old_max_xmit_size = this.max_xmit_req_size;
        this.max_xmit_req_size = this.max_xmit_req_size <= 0 ? estimated_max_msgs_in_xmit_req : Math.min(this.max_xmit_req_size, estimated_max_msgs_in_xmit_req);
        if (old_max_xmit_size != this.max_xmit_req_size) {
            this.log.trace("%s: set max_xmit_req_size from %d to %d", this.local_addr, old_max_xmit_size, this.max_xmit_req_size);
        }
        if (this.resend_last_seqno) {
            this.setResendLastSeqno(this.resend_last_seqno);
        }
    }

    public String printStabilityHistory() {
        StringBuilder sb = new StringBuilder();
        int i = 1;
        for (String digest : this.stability_msgs) {
            sb.append(i++).append(": ").append(digest).append("\n");
        }
        return sb.toString();
    }

    @Override
    public List<Integer> providedUpServices() {
        return Arrays.asList(39, 41, 42, 53);
    }

    @Override
    public void start() throws Exception {
        this.timer = this.getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer is null");
        }
        this.running = true;
        this.leaving = false;
        this.startRetransmitTask();
    }

    @Override
    public void stop() {
        this.running = false;
        this.is_server = false;
        if (this.become_server_queue != null) {
            this.become_server_queue.clear();
        }
        this.stopRetransmitTask();
        this.xmit_task_map.clear();
        this.reset();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 30: {
                this.stable((Digest)evt.getArg());
                return null;
            }
            case 39: {
                return this.getDigest((Address)evt.getArg());
            }
            case 41: {
                this.setDigest((Digest)evt.getArg());
                return null;
            }
            case 42: {
                this.overwriteDigest((Digest)evt.getArg());
                return null;
            }
            case 53: {
                this.mergeDigest((Digest)evt.getArg());
                return null;
            }
            case 15: {
                View tmp_view = (View)evt.getArg();
                this.members = tmp_view.getMembers();
                break;
            }
            case 6: {
                View tmp_view = (View)evt.getArg();
                List<Address> mbrs = tmp_view.getMembers();
                this.members = mbrs;
                this.view = tmp_view;
                this.adjustReceivers(mbrs);
                this.is_server = true;
                if (this.suppress_log_non_member != null) {
                    this.suppress_log_non_member.removeExpired(this.suppress_time_non_member_warnings);
                }
                this.xmit_task_map.keySet().retainAll(mbrs);
                break;
            }
            case 16: {
                this.is_server = true;
                this.flushBecomeServerQueue();
                break;
            }
            case 8: {
                this.local_addr = (Address)evt.getArg();
                break;
            }
            case 4: {
                this.leaving = true;
                this.reset();
                break;
            }
            case 78: {
                this.rebroadcasting = true;
                this.rebroadcast_digest = (Digest)evt.getArg();
                try {
                    this.rebroadcastMessages();
                }
                finally {
                    this.rebroadcasting = false;
                    this.rebroadcast_digest_lock.lock();
                    try {
                        this.rebroadcast_digest = null;
                    }
                    finally {
                        this.rebroadcast_digest_lock.unlock();
                    }
                }
                return null;
            }
        }
        return this.down_prot.down(evt);
    }

    @Override
    public Object down(Message msg) {
        Address dest = msg.getDest();
        if (dest != null || msg.isFlagSet(Message.Flag.NO_RELIABILITY)) {
            return this.down_prot.down(msg);
        }
        this.send(msg);
        return null;
    }

    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 30: {
                this.stable((Digest)evt.getArg());
                return null;
            }
            case 9: {
                if (!this.rebroadcasting) break;
                this.cancelRebroadcasting();
            }
        }
        return this.up_prot.up(evt);
    }

    @Override
    public Object up(Message msg) {
        if (msg.isFlagSet(Message.Flag.NO_RELIABILITY)) {
            return this.up_prot.up(msg);
        }
        NakAckHeader2 hdr = (NakAckHeader2)msg.getHeader(this.id);
        if (hdr == null) {
            return this.up_prot.up(msg);
        }
        if (!this.is_server) {
            this.queueMessage(msg, hdr.seqno);
            return null;
        }
        switch (hdr.type) {
            case 1: {
                this.handleMessage(msg, hdr);
                return null;
            }
            case 2: {
                try {
                    SeqnoList missing = Util.streamableFromBuffer(SeqnoList.class, msg.getRawBuffer(), msg.getOffset(), msg.getLength());
                    if (missing != null) {
                        this.handleXmitReq(msg.getSrc(), missing, hdr.sender);
                    }
                }
                catch (Exception e) {
                    this.log.error("failed deserializing retransmission list", e);
                }
                return null;
            }
            case 3: {
                this.handleXmitRsp(msg, hdr);
                return null;
            }
            case 4: {
                this.handleHighestSeqno(msg.src(), hdr.seqno);
                return null;
            }
        }
        this.log.error(Util.getMessage("HeaderTypeNotKnown"), this.local_addr, hdr.type);
        return null;
    }

    @Override
    public void up(MessageBatch batch) {
        int size = batch.size();
        boolean got_retransmitted_msg = false;
        ArrayList<LongTuple<Message>> msgs = null;
        Iterator<Message> it = batch.iterator();
        block8: while (it.hasNext()) {
            NakAckHeader2 hdr;
            Message msg = it.next();
            if (msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr = (NakAckHeader2)msg.getHeader(this.id)) == null) continue;
            it.remove();
            if (!this.is_server) {
                this.queueMessage(msg, hdr.seqno);
                continue;
            }
            switch (hdr.type) {
                case 1: {
                    if (msgs == null) {
                        msgs = new ArrayList<LongTuple<Message>>(size);
                    }
                    msgs.add(new LongTuple<Message>(hdr.seqno, msg));
                    break;
                }
                case 2: {
                    try {
                        SeqnoList missing = Util.streamableFromBuffer(SeqnoList.class, msg.getRawBuffer(), msg.getOffset(), msg.getLength());
                        if (missing == null) continue block8;
                        this.handleXmitReq(msg.getSrc(), missing, hdr.sender);
                    }
                    catch (Exception e) {
                        this.log.error("failed deserializing retransmission list", e);
                    }
                    break;
                }
                case 3: {
                    Message xmitted_msg = this.msgFromXmitRsp(msg, hdr);
                    if (xmitted_msg == null) continue block8;
                    if (msgs == null) {
                        msgs = new ArrayList(size);
                    }
                    msgs.add(new LongTuple<Message>(hdr.seqno, xmitted_msg));
                    got_retransmitted_msg = true;
                    break;
                }
                case 4: {
                    this.handleHighestSeqno(batch.sender(), hdr.seqno);
                    break;
                }
                default: {
                    this.log.error(Util.getMessage("HeaderTypeNotKnown"), this.local_addr, hdr.type);
                }
            }
        }
        if (msgs != null) {
            this.handleMessages(batch.dest(), batch.sender(), msgs, batch.mode() == MessageBatch.Mode.OOB, batch.clusterName());
        }
        if (got_retransmitted_msg && this.rebroadcasting) {
            this.checkForRebroadcasts();
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    @Override
    public Map<String, String> handleProbe(String ... keys) {
        HashMap<String, String> retval = new HashMap<String, String>();
        String[] stringArray = keys;
        int n = stringArray.length;
        block8: for (int i = 0; i < n; ++i) {
            String key;
            switch (key = stringArray[i]) {
                case "digest-history": {
                    retval.put(key, this.printDigestHistory());
                    continue block8;
                }
                case "dump-digest": {
                    retval.put(key, "\n" + this.printMessages());
                }
            }
        }
        return retval;
    }

    @Override
    public String[] supportedKeys() {
        return new String[]{"digest-history", "dump-digest"};
    }

    protected void queueMessage(Message msg, long seqno) {
        if (this.become_server_queue != null) {
            this.become_server_queue.add(msg);
            this.log.trace("%s: message %s::%d was added to queue (not yet server)", this.local_addr, msg.getSrc(), seqno);
        } else {
            this.log.trace("%s: message %s::%d was discarded (not yet server)", this.local_addr, msg.getSrc(), seqno);
        }
    }

    protected void unknownMember(Address sender, Object message) {
        if (this.leaving) {
            return;
        }
        if (this.log_discard_msgs && this.log.isWarnEnabled()) {
            if (this.suppress_log_non_member != null) {
                this.suppress_log_non_member.log(SuppressLog.Level.warn, sender, this.suppress_time_non_member_warnings, this.local_addr, message, sender, this.view);
            } else {
                this.log.warn(Util.getMessage("MsgDroppedNak"), this.local_addr, message, sender, this.view);
            }
        }
    }

    protected void send(Message msg) {
        if (!this.running) {
            this.log.trace("%s: discarded message as we're not in the 'running' state, message: %s", this.local_addr, msg);
            return;
        }
        Table buf = (Table)this.xmit_table.get(this.local_addr);
        if (buf == null) {
            return;
        }
        if (msg.src() == null) {
            msg.src(this.local_addr);
        }
        boolean dont_loopback_set = msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
        long msg_id = this.seqno.incrementAndGet();
        long sleep = 10L;
        while (true) {
            try {
                msg.putHeader(this.id, NakAckHeader2.createMessageHeader(msg_id));
                buf.add(msg_id, msg, dont_loopback_set ? dont_loopback_filter : null);
            }
            catch (Throwable t) {
                if (!this.running) continue;
                Util.sleep(sleep);
                sleep = Math.min(5000L, sleep * 2L);
                if (this.running) continue;
            }
            break;
        }
        if (this.is_trace) {
            this.log.trace("%s: sending %s#%d", this.local_addr, this.local_addr, msg_id);
        }
        this.down_prot.down(msg);
        ++this.num_messages_sent;
        if (this.resend_last_seqno && this.last_seqno_resender != null) {
            this.last_seqno_resender.skipNext();
        }
    }

    protected void handleMessage(Message msg, NakAckHeader2 hdr) {
        boolean added;
        Address sender = msg.getSrc();
        Table buf = (Table)this.xmit_table.get(sender);
        if (buf == null) {
            this.unknownMember(sender, hdr.seqno);
            return;
        }
        ++this.num_messages_received;
        boolean loopback = this.local_addr.equals(sender);
        boolean bl = loopback || buf.add(hdr.seqno, msg.isFlagSet(Message.Flag.OOB) ? DUMMY_OOB_MSG : msg) ? true : (added = false);
        if (added && this.is_trace) {
            this.log.trace("%s: received %s#%d", this.local_addr, sender, hdr.seqno);
        }
        if (added && msg.isFlagSet(Message.Flag.OOB)) {
            if (loopback) {
                msg = (Message)buf.get(hdr.seqno);
                if (msg != null && msg.isFlagSet(Message.Flag.OOB) && msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
                    this.deliver(msg, sender, hdr.seqno, "OOB message");
                }
            } else {
                this.deliver(msg, sender, hdr.seqno, "OOB message");
            }
        }
        this.removeAndDeliver(buf, sender, loopback, null);
    }

    protected void handleMessages(Address dest, Address sender, List<LongTuple<Message>> msgs, boolean oob, AsciiString cluster_name) {
        boolean added;
        Table buf = (Table)this.xmit_table.get(sender);
        if (buf == null) {
            this.unknownMember(sender, "batch");
            return;
        }
        this.num_messages_received += msgs.size();
        boolean loopback = this.local_addr.equals(sender);
        boolean bl = loopback || buf.add(msgs, oob, oob ? DUMMY_OOB_MSG : null) ? true : (added = false);
        if (added && this.is_trace) {
            this.log.trace("%s: received %s#%d-%d (%d messages)", this.local_addr, sender, msgs.get(0).getVal1(), msgs.get(msgs.size() - 1).getVal1(), msgs.size());
        }
        if (added && oob) {
            MessageBatch oob_batch = new MessageBatch(dest, sender, null, dest == null, MessageBatch.Mode.OOB, msgs.size());
            if (loopback) {
                for (LongTuple tuple : msgs) {
                    long seq = tuple.getVal1();
                    Message msg = (Message)buf.get(seq);
                    if (msg == null || !msg.isFlagSet(Message.Flag.OOB) || !msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) continue;
                    oob_batch.add(msg);
                }
            } else {
                for (LongTuple tuple : msgs) {
                    oob_batch.add((Message)tuple.getVal2());
                }
            }
            this.deliverBatch(oob_batch);
        }
        this.removeAndDeliver(buf, sender, loopback, cluster_name);
    }

    protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name) {
        AtomicInteger adders = buf.getAdders();
        if (adders.getAndIncrement() != 0) {
            return;
        }
        boolean remove_msgs = this.discard_delivered_msgs && !loopback;
        MessageBatch batch = new MessageBatch(buf.size()).dest(null).sender(sender).clusterName(cluster_name).multicast(true);
        Supplier<MessageBatch> batch_creator = () -> batch;
        do {
            try {
                batch.reset();
                buf.removeMany(remove_msgs, 0, this.no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs, batch_creator, BATCH_ACCUMULATOR);
            }
            catch (Throwable t) {
                this.log.error("failed removing messages from table for " + sender, t);
            }
            if (batch.isEmpty()) continue;
            this.deliverBatch(batch);
        } while (adders.decrementAndGet() != 0);
        if (this.rebroadcasting) {
            this.checkForRebroadcasts();
        }
    }

    protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender) {
        Table buf;
        this.log.trace("%s: received xmit request from %s for %s%s", this.local_addr, xmit_requester, original_sender, missing_msgs);
        if (this.stats) {
            this.xmit_reqs_received.add(missing_msgs.size());
        }
        if ((buf = (Table)this.xmit_table.get(original_sender)) == null) {
            this.log.error(Util.getMessage("SenderNotFound"), this.local_addr, original_sender);
            return;
        }
        for (long i : missing_msgs) {
            Message msg = (Message)buf.get(i);
            if (msg == null) {
                if (!this.log.isWarnEnabled() || !this.log_not_found_msgs || this.local_addr.equals(xmit_requester) || i <= buf.getLow()) continue;
                this.log.warn(Util.getMessage("MessageNotFound"), this.local_addr, original_sender, i);
                continue;
            }
            if (this.is_trace) {
                this.log.trace(this.local_addr + ": resending " + original_sender + "::" + i);
            }
            this.sendXmitRsp(xmit_requester, msg);
        }
    }

    protected void deliver(Message msg, Address sender, long seqno, String error_msg) {
        if (this.is_trace) {
            this.log.trace("%s: delivering %s#%d", this.local_addr, sender, seqno);
        }
        try {
            this.up_prot.up(msg);
        }
        catch (Throwable t) {
            this.log.error(Util.getMessage("FailedToDeliverMsg"), this.local_addr, error_msg, msg, t);
        }
    }

    protected void deliverBatch(MessageBatch batch) {
        try {
            if (batch == null || batch.isEmpty()) {
                return;
            }
            if (this.is_trace) {
                Message first = batch.first();
                Message last = batch.last();
                StringBuilder sb = new StringBuilder(this.local_addr + ": delivering " + batch.sender());
                if (first != null && last != null) {
                    NakAckHeader2 hdr1 = (NakAckHeader2)first.getHeader(this.id);
                    NakAckHeader2 hdr2 = (NakAckHeader2)last.getHeader(this.id);
                    sb.append("#").append(hdr1.seqno).append("-").append(hdr2.seqno);
                }
                sb.append(" (" + batch.size()).append(" messages)");
                this.log.trace(sb);
            }
            this.up_prot.up(batch);
        }
        catch (Throwable t) {
            this.log.error(Util.getMessage("FailedToDeliverMsg"), this.local_addr, "batch", batch, t);
        }
    }

    protected void flushBecomeServerQueue() {
        if (this.become_server_queue != null && !this.become_server_queue.isEmpty()) {
            this.log.trace("%s: flushing become_server_queue (%d elements)", this.local_addr, this.become_server_queue.size());
            TP transport = this.getTransport();
            for (Message msg : this.become_server_queue) {
                transport.submitToThreadPool(() -> {
                    try {
                        this.up(msg);
                    }
                    finally {
                        this.become_server_queue.remove(msg);
                    }
                }, true);
            }
        }
    }

    protected void cancelRebroadcasting() {
        this.rebroadcast_lock.lock();
        try {
            this.rebroadcasting = false;
            this.rebroadcast_done.signalAll();
        }
        finally {
            this.rebroadcast_lock.unlock();
        }
    }

    protected void sendXmitRsp(Address dest, Message msg) {
        if (msg == null) {
            return;
        }
        if (this.stats) {
            this.xmit_rsps_sent.increment();
        }
        if (msg.getSrc() == null) {
            msg.setSrc(this.local_addr);
        }
        if (this.use_mcast_xmit) {
            this.down_prot.down(msg);
            return;
        }
        Message xmit_msg = msg.copy(true, true).dest(dest);
        NakAckHeader2 hdr = (NakAckHeader2)xmit_msg.getHeader(this.id);
        NakAckHeader2 newhdr = hdr.copy();
        newhdr.type = (byte)3;
        xmit_msg.putHeader(this.id, newhdr);
        this.down_prot.down(xmit_msg);
    }

    protected void handleXmitRsp(Message msg, NakAckHeader2 hdr) {
        if (msg == null) {
            return;
        }
        try {
            if (this.stats) {
                this.xmit_rsps_received.increment();
            }
            msg.setDest(null);
            NakAckHeader2 newhdr = hdr.copy();
            newhdr.type = 1;
            msg.putHeader(this.id, newhdr);
            this.handleMessage(msg, newhdr);
            if (this.rebroadcasting) {
                this.checkForRebroadcasts();
            }
        }
        catch (Exception ex) {
            this.log.error(Util.getMessage("FailedToDeliverMsg"), this.local_addr, "retransmitted message", msg, ex);
        }
    }

    protected void handleHighestSeqno(Address sender, long seqno) {
        Table buf = (Table)this.xmit_table.get(sender);
        if (buf == null) {
            return;
        }
        long my_highest_received = buf.getHighestReceived();
        if (my_highest_received >= 0L && seqno > my_highest_received) {
            this.log.trace("%s: my_highest_rcvd (%s#%d) < highest received (%s#%d): requesting retransmission", this.local_addr, sender, my_highest_received, sender, seqno);
            this.retransmit(seqno, seqno, sender);
        }
    }

    protected Message msgFromXmitRsp(Message msg, NakAckHeader2 hdr) {
        if (msg == null) {
            return null;
        }
        if (this.stats) {
            this.xmit_rsps_received.increment();
        }
        msg.setDest(null);
        NakAckHeader2 newhdr = hdr.copy();
        newhdr.type = 1;
        msg.putHeader(this.id, newhdr);
        return msg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void rebroadcastMessages() {
        long sleep = this.max_rebroadcast_timeout / 3L;
        long wait_time = this.max_rebroadcast_timeout;
        long start = System.currentTimeMillis();
        while (wait_time > 0L) {
            Digest their_digest;
            this.rebroadcast_digest_lock.lock();
            try {
                if (this.rebroadcast_digest == null) {
                    return;
                }
                their_digest = this.rebroadcast_digest.copy();
            }
            finally {
                this.rebroadcast_digest_lock.unlock();
            }
            Digest my_digest = this.getDigest();
            boolean xmitted = false;
            for (Digest.Entry entry : their_digest) {
                long my_high;
                long their_high;
                Address member = entry.getMember();
                long[] my_entry = my_digest.get(member);
                if (my_entry == null || (their_high = entry.getHighest()) <= (my_high = Math.max(my_entry[0], my_entry[1]))) continue;
                this.log.trace("%s: fetching %d-%d from %s", this.local_addr, my_high, their_high, member);
                this.retransmit(my_high + 1L, their_high, member, true);
                xmitted = true;
            }
            if (!xmitted) {
                return;
            }
            this.rebroadcast_lock.lock();
            try {
                my_digest = this.getDigest();
                this.rebroadcast_digest_lock.lock();
                try {
                    if (!this.rebroadcasting || NAKACK2.isGreaterThanOrEqual(my_digest, this.rebroadcast_digest)) {
                        this.rebroadcast_digest_lock.unlock();
                        this.rebroadcast_lock.unlock();
                        return;
                    }
                }
                catch (Throwable throwable) {
                    this.rebroadcast_digest_lock.unlock();
                    throw throwable;
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            this.rebroadcast_digest_lock.unlock();
            this.rebroadcast_done.await(sleep, TimeUnit.MILLISECONDS);
            wait_time -= System.currentTimeMillis() - start;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void checkForRebroadcasts() {
        Digest tmp = this.getDigest();
        boolean cancel_rebroadcasting = false;
        this.rebroadcast_digest_lock.lock();
        try {
            cancel_rebroadcasting = NAKACK2.isGreaterThanOrEqual(tmp, this.rebroadcast_digest);
        }
        catch (Throwable throwable) {
        }
        finally {
            this.rebroadcast_digest_lock.unlock();
        }
        if (cancel_rebroadcasting) {
            this.cancelRebroadcasting();
        }
    }

    protected static boolean isGreaterThanOrEqual(Digest first, Digest other) {
        if (other == null) {
            return true;
        }
        for (Digest.Entry entry : first) {
            long their_highest;
            long my_highest;
            Address sender = entry.getMember();
            long[] their_entry = other.get(sender);
            if (their_entry == null || (my_highest = entry.getHighest()) >= (their_highest = Math.max(their_entry[0], their_entry[1]))) continue;
            return false;
        }
        return true;
    }

    protected void adjustReceivers(List<Address> members) {
        Set keys = this.xmit_table.keySet();
        for (Address member : keys) {
            Table buf;
            if (members.contains(member) || Objects.equals(this.local_addr, member) || (buf = (Table)this.xmit_table.remove(member)) == null) continue;
            this.log.debug("%s: removed %s from xmit_table (not member anymore)", this.local_addr, member);
        }
        members.stream().filter(mbr -> !keys.contains(mbr)).forEach(mbr -> this.xmit_table.putIfAbsent((Address)mbr, this.createTable(0L)));
    }

    public Digest getDigest() {
        HashMap<Address, long[]> map = new HashMap<Address, long[]>();
        for (Map.Entry entry : this.xmit_table.entrySet()) {
            Address sender = (Address)entry.getKey();
            Table buf = (Table)entry.getValue();
            long[] seqnos = buf.getDigest();
            map.put(sender, seqnos);
        }
        return new Digest(map);
    }

    public Digest getDigest(Address mbr) {
        if (mbr == null) {
            return this.getDigest();
        }
        Table buf = (Table)this.xmit_table.get(mbr);
        if (buf == null) {
            return null;
        }
        long[] seqnos = buf.getDigest();
        return new Digest(mbr, seqnos[0], seqnos[1]);
    }

    protected void setDigest(Digest digest) {
        this.setDigest(digest, false);
    }

    protected void mergeDigest(Digest digest) {
        this.setDigest(digest, true);
    }

    protected void overwriteDigest(Digest digest) {
        if (digest == null) {
            return;
        }
        StringBuilder sb = new StringBuilder("\n[overwriteDigest()]\n");
        sb.append("existing digest:  " + this.getDigest()).append("\nnew digest:       " + digest);
        for (Digest.Entry entry : digest) {
            Address member = entry.getMember();
            if (member == null) continue;
            long highest_delivered_seqno = entry.getHighestDeliveredSeqno();
            Table<Message> buf = (Table<Message>)this.xmit_table.get(member);
            if (buf != null) {
                if (this.local_addr.equals(member)) {
                    buf.setHighestDelivered(highest_delivered_seqno);
                    continue;
                }
                this.xmit_table.remove(member);
            }
            buf = this.createTable(highest_delivered_seqno);
            this.xmit_table.put(member, buf);
        }
        sb.append("\n").append("resulting digest: " + this.getDigest().toString(digest));
        this.digest_history.add(sb.toString());
        if (this.log.isDebugEnabled()) {
            this.log.debug(sb.toString());
        }
    }

    protected void setDigest(Digest digest, boolean merge) {
        if (digest == null) {
            return;
        }
        StringBuilder sb = this.log.isDebugEnabled() ? new StringBuilder("\n[" + this.local_addr + (merge ? " mergeDigest()]\n" : " setDigest()]\n")).append("existing digest:  " + this.getDigest()).append("\nnew digest:       " + digest) : null;
        boolean set_own_seqno = false;
        for (Digest.Entry entry : digest) {
            Address member = entry.getMember();
            if (member == null) continue;
            long highest_delivered_seqno = entry.getHighestDeliveredSeqno();
            Table<Message> buf = (Table<Message>)this.xmit_table.get(member);
            if (buf != null) {
                if (!merge || Objects.equals(this.local_addr, member) || buf.getHighestDelivered() >= highest_delivered_seqno) continue;
                this.xmit_table.remove(member);
                if (member.equals(this.local_addr)) {
                    this.seqno.set(highest_delivered_seqno);
                    set_own_seqno = true;
                }
            }
            buf = this.createTable(highest_delivered_seqno);
            this.xmit_table.put(member, buf);
        }
        if (sb != null) {
            sb.append("\n").append("resulting digest: " + this.getDigest().toString(digest));
        }
        if (set_own_seqno && sb != null) {
            sb.append("\nnew seqno for " + this.local_addr + ": " + this.seqno);
        }
        if (sb != null) {
            this.digest_history.add(sb.toString());
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug(sb.toString());
        }
    }

    protected Table<Message> createTable(long initial_seqno) {
        return new Table<Message>(this.xmit_table_num_rows, this.xmit_table_msgs_per_row, initial_seqno, this.xmit_table_resize_factor, this.xmit_table_max_compaction_time);
    }

    protected void stable(Digest digest) {
        if (this.members == null || this.local_addr == null || digest == null) {
            return;
        }
        this.log.trace("%s: received stable digest %s", this.local_addr, digest);
        this.stability_msgs.add(digest.toString());
        for (Digest.Entry entry : digest) {
            Address member = entry.getMember();
            if (member == null) continue;
            long hd = entry.getHighestDeliveredSeqno();
            long hr = entry.getHighestReceivedSeqno();
            Table buf = (Table)this.xmit_table.get(member);
            if (buf != null) {
                long my_hr = buf.getHighestReceived();
                if (hr >= 0L && hr > my_hr) {
                    this.log.trace("%s: my_highest_rcvd (%d) < stability_highest_rcvd (%d): requesting retransmission of %s", this.local_addr, my_hr, hr, member + "#" + hr);
                    this.retransmit(hr, hr, member);
                }
            }
            if (hd < 0L || buf == null) continue;
            this.log.trace("%s: deleting msgs <= %s from %s", this.local_addr, hd, member);
            buf.purge(hd);
        }
    }

    protected void retransmit(long first_seqno, long last_seqno, Address sender) {
        if (first_seqno <= last_seqno) {
            this.retransmit(first_seqno, last_seqno, sender, false);
        }
    }

    protected void retransmit(long first_seqno, long last_seqno, Address sender, boolean multicast_xmit_request) {
        SeqnoList list = new SeqnoList((int)(last_seqno - first_seqno + 1L), first_seqno).add(first_seqno, last_seqno);
        this.retransmit(list, sender, multicast_xmit_request);
    }

    protected void retransmit(SeqnoList missing_msgs, Address sender, boolean multicast_xmit_request) {
        Address random_member;
        Address dest;
        Address address = dest = multicast_xmit_request || this.use_mcast_xmit_req ? null : sender;
        if (this.xmit_from_random_member && !this.local_addr.equals(sender) && (random_member = Util.pickRandomElement(this.members)) != null && !this.local_addr.equals(random_member)) {
            dest = random_member;
        }
        Message retransmit_msg = new Message(dest).setBuffer(Util.streamableToBuffer(missing_msgs)).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL).putHeader(this.id, NakAckHeader2.createXmitRequestHeader(sender));
        this.log.trace("%s: sending XMIT_REQ (%s) to %s", this.local_addr, missing_msgs, dest);
        this.down_prot.down(retransmit_msg);
        if (this.stats) {
            this.xmit_reqs_sent.add(missing_msgs.size());
        }
    }

    protected void reset() {
        this.seqno.set(0L);
        this.xmit_table.clear();
    }

    protected static long sizeOfAllMessages(Table<Message> buf, boolean include_headers) {
        return buf.stream().reduce(0L, (size, el) -> {
            if (el == null) {
                return size;
            }
            return size + (include_headers ? el.size() : (long)el.getLength());
        }, (l, r) -> l);
    }

    protected void startRetransmitTask() {
        if (this.xmit_task == null || this.xmit_task.isDone()) {
            this.xmit_task = this.timer.scheduleWithFixedDelay(new RetransmitTask(), 0L, this.xmit_interval, TimeUnit.MILLISECONDS, this.sends_can_block);
        }
    }

    protected void stopRetransmitTask() {
        if (this.xmit_task != null) {
            this.xmit_task.cancel(true);
            this.xmit_task = null;
        }
    }

    @ManagedOperation(description="Triggers the retransmission task, asking all senders for missing messages")
    public void triggerXmit() {
        for (Map.Entry entry : this.xmit_table.entrySet()) {
            SeqnoList missing;
            Address target = (Address)entry.getKey();
            Table buf = (Table)entry.getValue();
            if (buf != null && buf.getNumMissing() > 0 && (missing = buf.getMissing(this.max_xmit_req_size)) != null) {
                long highest = missing.getLast();
                Long prev_seqno = this.xmit_task_map.get(target);
                if (prev_seqno == null) {
                    this.xmit_task_map.put(target, highest);
                    continue;
                }
                missing.removeHigherThan(prev_seqno);
                if (highest > prev_seqno) {
                    this.xmit_task_map.put(target, highest);
                }
                if (missing.isEmpty()) continue;
                this.retransmit(missing, target, false);
                continue;
            }
            if (this.xmit_task_map.isEmpty()) continue;
            this.xmit_task_map.remove(target);
        }
        if (this.resend_last_seqno && this.last_seqno_resender != null) {
            this.last_seqno_resender.execute(this.seqno.get());
        }
    }

    protected class LastSeqnoResender {
        protected int num_resends;
        protected long last_seqno_resent;
        protected final AtomicBoolean skip_next_resend = new AtomicBoolean(false);

        protected LastSeqnoResender() {
        }

        protected void skipNext() {
            this.skip_next_resend.compareAndSet(false, true);
        }

        protected void execute(long seqno) {
            if (seqno == 0L || this.skip_next_resend.compareAndSet(true, false)) {
                return;
            }
            if (seqno == this.last_seqno_resent && this.num_resends >= NAKACK2.this.resend_last_seqno_max_times) {
                return;
            }
            if (seqno > this.last_seqno_resent) {
                this.last_seqno_resent = seqno;
                this.num_resends = 1;
            } else {
                ++this.num_resends;
            }
            Message msg = new Message(null).putHeader(NAKACK2.this.id, NakAckHeader2.createHighestSeqnoHeader(seqno)).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL).setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
            NAKACK2.this.down_prot.down(msg);
        }
    }

    protected class RetransmitTask
    implements Runnable {
        protected RetransmitTask() {
        }

        @Override
        public void run() {
            NAKACK2.this.triggerXmit();
        }

        public String toString() {
            return NAKACK2.class.getSimpleName() + ": RetransmitTask (interval=" + NAKACK2.this.xmit_interval + " ms)";
        }
    }
}

