/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntBinaryOperator;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jgroups.Address;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.ObjectMessage;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.protocols.NakAckHeader;
import org.jgroups.protocols.TCP;
import org.jgroups.protocols.TP;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.Protocol;
import org.jgroups.util.AsciiString;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.BoundedList;
import org.jgroups.util.Buffer;
import org.jgroups.util.Digest;
import org.jgroups.util.FastArray;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.SeqnoList;
import org.jgroups.util.SuppressLog;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

@MBean(description="Reliable multicast (one-to-many) protocol")
public abstract class ReliableMulticast
extends Protocol
implements DiagnosticsHandler.ProbeHandler {
    @Property(description="Retransmit retransmit responses (messages) using multicast rather than unicast")
    protected boolean use_mcast_xmit = true;
    @Property(description="Use a multicast to request retransmission of missing messages")
    protected boolean use_mcast_xmit_req;
    @Property(description="Ask a random member for retransmission of a missing message")
    protected boolean xmit_from_random_member;
    @Property(description="Should messages delivered to application be discarded")
    protected boolean discard_delivered_msgs = true;
    @Property(description="discards warnings about promiscuous traffic")
    protected boolean log_discard_msgs = true;
    @Property(description="If false, skips warnings about retransmission messages not found in the xmit_table (used for testing)")
    protected boolean log_not_found_msgs = true;
    @Property(description="Interval (in milliseconds) at which missing messages (from all retransmit buffers) are retransmitted. 0 turns retransmission off", type=AttributeType.TIME)
    protected long xmit_interval = 1000L;
    @Property(description="Size of the queue to hold messages received after creating the channel, but before being connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the queue is cleared. The motivation is to avoid retransmissions (see https://issues.redhat.com/browse/JGRP-1509 for details).")
    protected int become_server_queue_size = 50;
    @Property(description="Time during which identical warnings about messages from a non member will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.", type=AttributeType.TIME)
    protected long suppress_time_non_member_warnings = 60000L;
    @Property(description="Max number of messages to ask for in a retransmit request. 0 disables this and uses the max bundle size in the transport", type=AttributeType.SCALAR)
    protected int max_xmit_req_size = 1024;
    @Property(description="The max size of a message batch when delivering messages. 0 is unbounded", type=AttributeType.SCALAR)
    protected int max_batch_size;
    @Property(description="Reuses the same message batch for delivery of regular messages (only done by a single thread anyway). Not advisable for buffers that can grow infinitely (NAKACK3)")
    protected boolean reuse_message_batches = true;
    @Property(description="Increment seqno and send a message atomically. Reduces retransmissions. Description in doc/design/NAKACK4.txt ('misc')")
    protected boolean send_atomically;
    @ManagedAttribute(description="True if sending a message can block at the transport level")
    protected boolean sends_can_block = true;
    @ManagedAttribute(description="Number of messages sent", type=AttributeType.SCALAR)
    protected final LongAdder num_messages_sent = new LongAdder();
    @ManagedAttribute(description="Number of messages received", type=AttributeType.SCALAR)
    protected final LongAdder num_messages_received = new LongAdder();
    protected static final Message DUMMY_OOB_MSG = new EmptyMessage().setFlag(Message.Flag.OOB);
    protected final Predicate<Message> no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs = msg -> !(msg == null || msg == DUMMY_OOB_MSG || msg.isFlagSet(Message.Flag.OOB) && !msg.setFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED) || msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK) && Objects.equals(this.local_addr, msg.getSrc()));
    protected static final Predicate<Message> remove_filter = m -> m != null && (m.isFlagSet(Message.TransientFlag.DONT_LOOPBACK) || m == DUMMY_OOB_MSG || m.isFlagSet(Message.TransientFlag.OOB_DELIVERED));
    protected static final BiConsumer<MessageBatch, Message> BATCH_ACCUMULATOR = (mb, m) -> mb.add((Message)m, true, false);
    protected final Function<Message, Long> SEQNO_GETTER = m -> {
        NakAckHeader hdr = m != null ? (NakAckHeader)m.getHeader(this.id) : null;
        return hdr == null || hdr.getType() != 1 ? -1L : hdr.getSeqno();
    };
    protected final Predicate<Message> HAS_HEADER = m -> m != null && m.getHeader(this.id) != null;
    @ManagedAttribute(description="Number of retransmit requests received", type=AttributeType.SCALAR)
    protected final LongAdder xmit_reqs_received = new LongAdder();
    @ManagedAttribute(description="Number of retransmit requests sent", type=AttributeType.SCALAR)
    protected final LongAdder xmit_reqs_sent = new LongAdder();
    @ManagedAttribute(description="Number of retransmit responses received (only when use_macst_xmit=false)", type=AttributeType.SCALAR)
    protected final LongAdder xmit_rsps_received = new LongAdder();
    @ManagedAttribute(description="Number of retransmit responses sent", type=AttributeType.SCALAR)
    protected final LongAdder xmit_rsps_sent = new LongAdder();
    @ManagedAttribute(description="The average number of messages in a batch removed from the table and delivered to the application")
    protected final AverageMinMax avg_batch_size = new AverageMinMax(1024);
    @ManagedAttribute(description="tracing is enabled or disabled for the given log", writable=true)
    protected boolean is_trace = this.log.isTraceEnabled();
    protected volatile boolean is_server;
    protected volatile List<Address> members = new ArrayList<Address>();
    protected volatile View view;
    protected final AtomicLong seqno = new AtomicLong(0L);
    protected final Map<Address, Entry> xmit_table = Util.createConcurrentMap();
    protected Buffer<Message> local_xmit_table;
    protected Entry local_send_entry;
    protected Future<?> xmit_task;
    protected final Map<Address, Long> xmit_task_map = new ConcurrentHashMap<Address, Long>();
    protected final Map<Address, Long> stable_xmit_map = Util.createConcurrentMap();
    protected final Map<Address, MessageBatch> cached_batches = Util.createConcurrentMap();
    protected volatile boolean leaving;
    protected volatile boolean running;
    protected TimeScheduler timer;
    protected final LastSeqnoResender last_seqno_resender = new LastSeqnoResender();
    protected final BoundedList<String> stability_msgs = new BoundedList(10);
    protected final BoundedList<String> digest_history = new BoundedList(10);
    protected Queue<Message> become_server_queue = new ConcurrentLinkedQueue<Message>();
    protected SuppressLog<Address> suppress_log_non_member;

    @ManagedAttribute(description="Is the retransmit task running")
    public boolean isXmitTaskRunning() {
        return this.xmit_task != null && !this.xmit_task.isDone();
    }

    @ManagedAttribute(description="Number of messages from non-members", type=AttributeType.SCALAR)
    public int getNonMemberMessages() {
        return this.suppress_log_non_member != null ? this.suppress_log_non_member.getCache().size() : 0;
    }

    protected abstract Buffer<Message> createXmitWindow(long var1);

    public boolean sendBufferCanBlock() {
        return false;
    }

    @ManagedOperation(description="Clears the cache for messages from non-members")
    public void clearNonMemberCache() {
        if (this.suppress_log_non_member != null) {
            this.suppress_log_non_member.getCache().clear();
        }
    }

    @ManagedOperation(description="Prints the cached batches (if reuse_message_batches is true)")
    public String printCachedBatches() {
        return "\n" + this.cached_batches.entrySet().stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue())).collect(Collectors.joining("\n"));
    }

    @ManagedOperation(description="Prints the cached batches (if reuse_message_batches is true)")
    public ReliableMulticast clearCachedBatches() {
        this.cached_batches.clear();
        return this;
    }

    public long getXmitRequestsReceived() {
        return this.xmit_reqs_received.sum();
    }

    public long getXmitRequestsSent() {
        return this.xmit_reqs_sent.sum();
    }

    public long getXmitResponsesReceived() {
        return this.xmit_rsps_received.sum();
    }

    public long getXmitResponsesSent() {
        return this.xmit_rsps_sent.sum();
    }

    public boolean useMcastXmit() {
        return this.use_mcast_xmit;
    }

    public ReliableMulticast useMcastXmit(boolean u) {
        this.use_mcast_xmit = u;
        return this;
    }

    public boolean useMcastXmitReq() {
        return this.use_mcast_xmit_req;
    }

    public ReliableMulticast useMcastXmitReq(boolean flag) {
        this.use_mcast_xmit_req = flag;
        return this;
    }

    public boolean xmitFromRandomMember() {
        return this.xmit_from_random_member;
    }

    public ReliableMulticast xmitFromRandomMember(boolean x) {
        this.xmit_from_random_member = x;
        return this;
    }

    public boolean discardDeliveredMsgs() {
        return this.discard_delivered_msgs;
    }

    public ReliableMulticast discardDeliveredMsgs(boolean d) {
        this.discard_delivered_msgs = d;
        return this;
    }

    public boolean logDiscardMessages() {
        return this.log_discard_msgs;
    }

    public ReliableMulticast logDiscardMessages(boolean l) {
        this.log_discard_msgs = l;
        return this;
    }

    public boolean logNotFoundMessages() {
        return this.log_not_found_msgs;
    }

    public ReliableMulticast logNotFoundMessages(boolean flag) {
        this.log_not_found_msgs = flag;
        return this;
    }

    public ReliableMulticast setXmitFromRandomMember(boolean r) {
        this.xmit_from_random_member = r;
        return this;
    }

    public ReliableMulticast setDiscardDeliveredMsgs(boolean d) {
        this.discard_delivered_msgs = d;
        return this;
    }

    public long getXmitInterval() {
        return this.xmit_interval;
    }

    public ReliableMulticast setXmitInterval(long x) {
        this.xmit_interval = x;
        return this;
    }

    public int getBecomeServerQueueSize() {
        return this.become_server_queue_size;
    }

    public ReliableMulticast setBecomeServerQueueSize(int b) {
        this.become_server_queue_size = b;
        return this;
    }

    public long getSuppressTimeNonMemberWarnings() {
        return this.suppress_time_non_member_warnings;
    }

    public ReliableMulticast setSuppressTimeNonMemberWarnings(long s) {
        this.suppress_time_non_member_warnings = s;
        return this;
    }

    public int getMaxXmitReqSize() {
        return this.max_xmit_req_size;
    }

    public ReliableMulticast setMaxXmitReqSize(int m) {
        this.max_xmit_req_size = m;
        return this;
    }

    public boolean sendsCanBlock() {
        return this.sends_can_block;
    }

    public ReliableMulticast sendsCanBlock(boolean s) {
        this.sends_can_block = s;
        return this;
    }

    public long getNumMessagesSent() {
        return this.num_messages_sent.sum();
    }

    public long getNumMessagesReceived() {
        return this.num_messages_received.sum();
    }

    public boolean reuseMessageBatches() {
        return this.reuse_message_batches;
    }

    public ReliableMulticast reuseMessageBatches(boolean b) {
        this.reuse_message_batches = b;
        return this;
    }

    public boolean sendAtomically() {
        return this.send_atomically;
    }

    public ReliableMulticast sendAtomically(boolean f) {
        this.send_atomically = f;
        return this;
    }

    public boolean isTrace() {
        return this.is_trace;
    }

    public ReliableMulticast isTrace(boolean i) {
        this.is_trace = i;
        return this;
    }

    @Override
    public <T extends Protocol> T setLevel(String level) {
        Object retval = super.setLevel(level);
        this.is_trace = this.log.isTraceEnabled();
        return retval;
    }

    @ManagedAttribute(description="Actual size of the become_server_queue", type=AttributeType.SCALAR)
    public int getBecomeServerQueueSizeActual() {
        return this.become_server_queue.size();
    }

    public <T extends Buffer<Message>> T getBuf(Address sender) {
        if (sender == null) {
            return null;
        }
        Entry entry = this.xmit_table.get(sender);
        return (T)(entry != null ? entry.buf() : null);
    }

    protected Entry getEntry(Address sender) {
        if (sender == null) {
            return null;
        }
        return this.xmit_table.get(sender);
    }

    public void setTimer(TimeScheduler timer) {
        this.timer = timer;
    }

    @ManagedAttribute(description="Total number of undelivered messages in all retransmit buffers", type=AttributeType.SCALAR)
    public int getXmitTableUndeliveredMsgs() {
        return this.xmit_table.values().stream().map(Entry::buf).map(Buffer::size).reduce(Integer::sum).orElse(0);
    }

    @ManagedAttribute(description="Total number of missing messages in all retransmit buffers", type=AttributeType.SCALAR)
    public int getXmitTableMissingMessages() {
        return this.xmit_table.values().stream().map(Entry::buf).map(Buffer::numMissing).reduce(Integer::sum).orElse(0);
    }

    @ManagedAttribute(description="Capacity of the retransmit buffer")
    public long getXmitTableCapacity() {
        Buffer<Message> win = this.sendBuf();
        return win != null ? (long)win.capacity() : 0L;
    }

    @ManagedAttribute(description="Returns the number of bytes of all messages in all retransmit buffers. To compute the size, Message.getLength() is used", type=AttributeType.BYTES)
    public long getSizeOfAllMessages() {
        return this.xmit_table.values().stream().map(Entry::buf).map(win -> ReliableMulticast.sizeOfAllMessages(win, false)).reduce(0L, Long::sum);
    }

    @ManagedAttribute(description="Returns the number of bytes of all messages in all retransmit buffers. To compute the size, Message.size() is used", type=AttributeType.BYTES)
    public long getSizeOfAllMessagesInclHeaders() {
        return this.xmit_table.values().stream().map(Entry::buf).map(win -> ReliableMulticast.sizeOfAllMessages(win, true)).reduce(0L, Long::sum);
    }

    @ManagedOperation(description="Prints the contents of the receiver windows for all members")
    public String printMessages() {
        StringBuilder ret = new StringBuilder("\n");
        for (Map.Entry<Address, Entry> entry : this.xmit_table.entrySet()) {
            Address addr = entry.getKey();
            Buffer<Message> win = entry.getValue().buf();
            int cap = win.capacity();
            ret.append(addr).append(": ").append(win);
            if (cap > 0) {
                ret.append(String.format(" [capacity: %s]", cap));
            }
            ret.append('\n');
        }
        return ret.toString();
    }

    @ManagedOperation(description="Prints the cached batches (if reuse_message_batches is true)")
    public String printBatches() {
        return "\n" + this.cached_batches.entrySet().stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue())).collect(Collectors.joining("\n"));
    }

    @ManagedAttribute
    public long getCurrentSeqno() {
        return this.seqno.get();
    }

    @ManagedOperation(description="Prints the stability messages received")
    public String printStabilityMessages() {
        return Util.printListWithDelimiter(this.stability_msgs, "\n");
    }

    @ManagedOperation(description="Keeps information about the last N times a digest was set or merged")
    public String printDigestHistory() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        for (String tmp : this.digest_history) {
            sb.append(tmp).append("\n");
        }
        return sb.toString();
    }

    protected Buffer<Message> sendBuf() {
        return this.local_xmit_table != null ? this.local_xmit_table : (this.local_xmit_table = this.getBuf(this.local_addr));
    }

    protected Entry sendEntry() {
        return this.local_send_entry != null ? this.local_send_entry : (this.local_send_entry = this.getEntry(this.local_addr));
    }

    @Override
    @ManagedOperation(description="Resets all statistics")
    public void resetStats() {
        this.num_messages_sent.reset();
        this.num_messages_received.reset();
        this.xmit_reqs_received.reset();
        this.xmit_reqs_sent.reset();
        this.xmit_rsps_received.reset();
        this.xmit_rsps_sent.reset();
        this.stability_msgs.clear();
        this.digest_history.clear();
        this.avg_batch_size.clear();
        Buffer<Message> table = this.sendBuf();
        if (table != null) {
            table.resetStats();
        }
    }

    @Override
    public void init() throws Exception {
        if (this.xmit_from_random_member && this.discard_delivered_msgs) {
            this.discard_delivered_msgs = false;
            this.log.debug("%s: xmit_from_random_member set to true: changed discard_delivered_msgs to false", this.local_addr);
        }
        TP transport = this.getTransport();
        this.sends_can_block = transport instanceof TCP;
        transport.registerProbeHandler(this);
        if (!transport.supportsMulticasting()) {
            if (this.use_mcast_xmit) {
                this.log.debug(Util.getMessage("NoMulticastTransport"), "use_mcast_xmit", transport.getName(), "use_mcast_xmit");
                this.use_mcast_xmit = false;
            }
            if (this.use_mcast_xmit_req) {
                this.log.debug(Util.getMessage("NoMulticastTransport"), "use_mcast_xmit_req", transport.getName(), "use_mcast_xmit_req");
                this.use_mcast_xmit_req = false;
            }
        }
        if (this.become_server_queue_size <= 0) {
            this.log.warn("%s: %s.become_server_queue_size is <= 0; setting it to 10", this.local_addr, ReliableMulticast.class.getSimpleName());
            this.become_server_queue_size = 10;
        }
        this.become_server_queue = new ArrayBlockingQueue<Message>(this.become_server_queue_size);
        if (this.suppress_time_non_member_warnings > 0L) {
            this.suppress_log_non_member = new SuppressLog(this.log, "MsgDroppedNak");
        }
        int estimated_max_msgs_in_xmit_req = (transport.getBundler().getMaxSize() - 50) * 8;
        int old_max_xmit_size = this.max_xmit_req_size;
        this.max_xmit_req_size = this.max_xmit_req_size <= 0 ? estimated_max_msgs_in_xmit_req : Math.min(this.max_xmit_req_size, estimated_max_msgs_in_xmit_req);
        if (old_max_xmit_size != this.max_xmit_req_size) {
            this.log.trace("%s: set max_xmit_req_size from %d to %d", this.local_addr, old_max_xmit_size, this.max_xmit_req_size);
        }
    }

    @Override
    public List<Integer> providedUpServices() {
        return Arrays.asList(39, 41, 42, 53);
    }

    @Override
    public void start() throws Exception {
        this.timer = this.getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer is null");
        }
        this.running = true;
        this.leaving = false;
        this.startRetransmitTask();
    }

    @Override
    public void stop() {
        this.running = false;
        this.is_server = false;
        this.become_server_queue.clear();
        this.stopRetransmitTask();
        this.xmit_task_map.clear();
        this.stable_xmit_map.clear();
        this.local_xmit_table = null;
        this.local_send_entry = null;
        this.reset();
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 30: {
                this.stable((Digest)evt.getArg());
                return null;
            }
            case 39: {
                return this.getDigest((Address)evt.getArg());
            }
            case 41: {
                this.setDigest((Digest)evt.getArg());
                return null;
            }
            case 42: {
                this.overwriteDigest((Digest)evt.getArg());
                return null;
            }
            case 53: {
                this.mergeDigest((Digest)evt.getArg());
                return null;
            }
            case 15: {
                View tmp_view = (View)evt.getArg();
                this.members = tmp_view.getMembers();
                break;
            }
            case 6: {
                View tmp_view = (View)evt.getArg();
                List<Address> mbrs = tmp_view.getMembers();
                this.members = mbrs;
                this.view = tmp_view;
                this.adjustReceivers(mbrs);
                this.is_server = true;
                if (this.suppress_log_non_member != null) {
                    this.suppress_log_non_member.removeExpired(this.suppress_time_non_member_warnings);
                }
                this.xmit_task_map.keySet().retainAll(mbrs);
                this.stable_xmit_map.keySet().retainAll(mbrs);
                this.cached_batches.keySet().retainAll(mbrs);
                break;
            }
            case 16: {
                this.is_server = true;
                this.flushBecomeServerQueue();
                break;
            }
            case 4: {
                this.leaving = true;
                this.reset();
            }
        }
        return this.down_prot.down(evt);
    }

    @Override
    public Object down(Message msg) {
        Address dest = msg.getDest();
        if (dest != null || msg.isFlagSet(Message.Flag.NO_RELIABILITY)) {
            return this.down_prot.down(msg);
        }
        if (!this.running) {
            this.log.trace("%s: discarded message as start() has not yet been called, message: %s", this.local_addr, msg);
            return null;
        }
        Entry send_entry = this.sendEntry();
        if (send_entry == null) {
            return null;
        }
        if (msg.getSrc() == null) {
            msg.setSrc(this.local_addr);
        }
        boolean dont_loopback_set = msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK);
        Buffer<Message> win = send_entry.buf();
        if (this.send(msg, win, dont_loopback_set)) {
            this.num_messages_sent.increment();
            if (dont_loopback_set && this.needToSendAck(send_entry, 1)) {
                this.handleAck(this.local_addr, win.highestDelivered());
            }
        } else {
            this.log.trace("%s: dropped message due to closed send buffer, message: %s", this.local_addr, msg);
        }
        this.last_seqno_resender.skipNext();
        return null;
    }

    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 30: {
                this.stable((Digest)evt.getArg());
                return null;
            }
        }
        return this.up_prot.up(evt);
    }

    @Override
    public Object up(Message msg) {
        if (msg.isFlagSet(Message.Flag.NO_RELIABILITY)) {
            return this.up_prot.up(msg);
        }
        NakAckHeader hdr = (NakAckHeader)msg.getHeader(this.id);
        if (hdr == null) {
            return this.up_prot.up(msg);
        }
        if (!this.is_server) {
            this.queueMessage(msg, hdr.seqno);
            return null;
        }
        switch (hdr.type) {
            case 1: {
                this.handleMessage(msg, hdr);
                return null;
            }
            case 2: {
                try {
                    SeqnoList missing = (SeqnoList)msg.getObject();
                    if (missing != null) {
                        this.handleXmitReq(msg.getSrc(), missing, hdr.sender);
                    }
                }
                catch (Exception e) {
                    this.log.error("failed deserializing retransmission list", e);
                }
                return null;
            }
            case 3: {
                this.handleXmitRsp(msg, hdr);
                return null;
            }
            case 4: {
                this.handleHighestSeqno(msg.getSrc(), hdr.seqno);
                return null;
            }
            case 5: {
                this.handleAck(msg.src(), hdr.seqno);
                return null;
            }
        }
        this.log.error(Util.getMessage("HeaderTypeNotKnown"), this.local_addr, hdr.type);
        return null;
    }

    @Override
    public void up(MessageBatch mb) {
        boolean found_nackack_msg = mb.anyMatch(this.HAS_HEADER);
        if (!found_nackack_msg) {
            if (!mb.isEmpty()) {
                this.up_prot.up(mb);
            }
            return;
        }
        long highest_ack = 0L;
        FastArray.FastIterator it = (FastArray.FastIterator)mb.iterator();
        block9: while (it.hasNext()) {
            NakAckHeader hdr;
            Message msg = (Message)it.next();
            if (msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr = (NakAckHeader)msg.getHeader(this.id)) == null) continue;
            if (!this.is_server) {
                this.queueMessage(msg, hdr.seqno);
                it.remove();
                continue;
            }
            switch (hdr.type) {
                case 1: {
                    break;
                }
                case 2: {
                    it.remove();
                    try {
                        SeqnoList missing = (SeqnoList)msg.getObject();
                        if (missing == null) continue block9;
                        this.handleXmitReq(msg.getSrc(), missing, hdr.sender);
                    }
                    catch (Exception e) {
                        this.log.error("failed deserializing retransmission list", e);
                    }
                    break;
                }
                case 3: {
                    Message xmitted_msg = this.msgFromXmitRsp(msg, hdr);
                    if (xmitted_msg == null) continue block9;
                    it.replace(xmitted_msg);
                    break;
                }
                case 4: {
                    it.remove();
                    this.handleHighestSeqno(mb.sender(), hdr.seqno);
                    break;
                }
                case 5: {
                    it.remove();
                    highest_ack = Math.max(highest_ack, hdr.seqno);
                    break;
                }
                default: {
                    this.log.error(Util.getMessage("HeaderTypeNotKnown"), this.local_addr, hdr.type);
                }
            }
        }
        if (highest_ack > 0L) {
            this.handleAck(mb.sender(), highest_ack);
        }
        if (!mb.isEmpty()) {
            this.handleMessageBatch(mb);
        }
        if (!mb.isEmpty()) {
            this.up_prot.up(mb);
        }
    }

    @Override
    public Map<String, String> handleProbe(String ... keys2) {
        HashMap<String, String> retval = new HashMap<String, String>();
        String[] stringArray = keys2;
        int n = stringArray.length;
        block8: for (int i = 0; i < n; ++i) {
            String key;
            switch (key = stringArray[i]) {
                case "digest-history": {
                    retval.put(key, this.printDigestHistory());
                    continue block8;
                }
                case "dump-digest": {
                    retval.put(key, "\n" + this.printMessages());
                }
            }
        }
        return retval;
    }

    @Override
    public String[] supportedKeys() {
        return new String[]{"digest-history", "dump-digest"};
    }

    protected void queueMessage(Message msg, long seqno) {
        if (this.become_server_queue.offer(msg)) {
            this.log.trace("%s: message %s#%d was queued (not yet server)", this.local_addr, msg.getSrc(), seqno);
        } else {
            this.log.trace("%s: message %s#%d was discarded (not yet server, queue full)", this.local_addr, msg.getSrc(), seqno);
        }
    }

    protected void unknownMember(Address sender, Object message) {
        if (this.leaving) {
            return;
        }
        if (this.log_discard_msgs && this.log.isWarnEnabled()) {
            if (this.suppress_log_non_member != null) {
                this.suppress_log_non_member.log(SuppressLog.Level.warn, sender, this.suppress_time_non_member_warnings, this.local_addr, message, sender, this.view);
            } else {
                this.log.warn(Util.getMessage("MsgDroppedNak"), this.local_addr, message, sender, this.view);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean send(Message msg, Buffer<Message> win, boolean dont_loopback_set) {
        Lock lock;
        Lock lock2 = lock = this.send_atomically ? win.lock() : null;
        if (lock != null) {
            lock.lock();
        }
        try {
            long msg_id = this.seqno.incrementAndGet();
            if (this.is_trace) {
                this.log.trace("%s --> [all]: #%d", this.local_addr, msg_id);
            }
            msg.putHeader(this.id, NakAckHeader.createMessageHeader(msg_id));
            if (!this.addToSendBuffer(win, msg_id, msg, dont_loopback_set ? remove_filter : null)) {
                boolean bl = false;
                return bl;
            }
            this.down_prot.down(msg);
            boolean bl = true;
            return bl;
        }
        finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }

    protected boolean addToSendBuffer(Buffer<Message> win, long seq, Message msg, Predicate<Message> filter) {
        long sleep = 10L;
        boolean rc = false;
        while (true) {
            try {
                rc = win.add(seq, msg, filter);
            }
            catch (Throwable t) {
                if (!this.running) continue;
                Util.sleep(sleep);
                sleep = Math.min(5000L, sleep * 2L);
                if (this.running) continue;
            }
            break;
        }
        return rc;
    }

    protected void resend(Message msg) {
        this.down_prot.down(msg);
    }

    protected void handleMessage(Message msg, NakAckHeader hdr) {
        boolean added;
        Address sender = msg.getSrc();
        Entry entry = this.xmit_table.get(sender);
        if (entry == null) {
            this.unknownMember(sender, hdr.seqno);
            return;
        }
        this.num_messages_received.increment();
        boolean loopback = this.local_addr.equals(sender);
        Message win = this.getBuf(sender);
        boolean bl = loopback || ((Buffer)((Object)win)).add(hdr.seqno, (Message)(msg.isFlagSet(Message.Flag.OOB) ? DUMMY_OOB_MSG : msg)) ? true : (added = false);
        if (added && msg.isFlagSet(Message.Flag.OOB)) {
            if (loopback) {
                msg = (Message)((Buffer)((Object)win)).get(hdr.seqno);
                if (msg != null && msg.isFlagSet(Message.Flag.OOB) && msg.setFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
                    this.deliver(msg, sender, hdr.seqno, entry, "OOB message");
                }
            } else {
                this.deliver(msg, sender, hdr.seqno, entry, "OOB message");
            }
        }
        this.removeAndDeliver((Buffer<Message>)((Object)win), entry, sender, loopback, null);
    }

    protected void handleMessageBatch(MessageBatch mb) {
        boolean added;
        Address sender = mb.sender();
        Entry entry = this.xmit_table.get(sender);
        if (entry == null) {
            mb.removeIf(this.HAS_HEADER, true);
            this.unknownMember(sender, "batch");
            return;
        }
        int size = mb.size();
        this.num_messages_received.add(size);
        boolean loopback = this.local_addr.equals(sender);
        boolean oob = mb.mode() == MessageBatch.Mode.OOB;
        Message win = this.getBuf(sender);
        boolean bl = loopback || ((Buffer)((Object)win)).add(mb, this.SEQNO_GETTER, !oob, (Message)(oob ? DUMMY_OOB_MSG : null)) ? true : (added = false);
        if (added && oob) {
            MessageBatch oob_batch;
            Address dest = mb.dest();
            MessageBatch messageBatch = loopback ? new MessageBatch(dest, sender, null, dest == null, MessageBatch.Mode.OOB, size) : (oob_batch = mb);
            if (loopback) {
                for (Message m : mb) {
                    long seq = this.SEQNO_GETTER.apply(m);
                    Message msg = (Message)((Buffer)((Object)win)).get(seq);
                    if (msg == null || !msg.isFlagSet(Message.Flag.OOB) || !msg.setFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) continue;
                    oob_batch.add(msg);
                }
            }
            this.deliverBatch(oob_batch, entry);
        }
        this.removeAndDeliver((Buffer<Message>)((Object)win), entry, sender, loopback, mb.clusterName());
        if (oob || loopback) {
            mb.removeIf(this.HAS_HEADER, true);
        }
    }

    protected void removeAndDeliver(Buffer<Message> win, Entry e, Address sender, boolean loopback, AsciiString cluster) {
        AtomicInteger adders = win.getAdders();
        if (adders.getAndIncrement() != 0) {
            return;
        }
        boolean remove_msgs = this.discard_delivered_msgs && !loopback;
        int cap = this.max_batch_size > 0 && this.max_batch_size < win.capacity() ? this.max_batch_size : win.capacity();
        AsciiString cl = cluster != null ? cluster : this.getTransport().getClusterNameAscii();
        MessageBatch b = null;
        if (this.reuse_message_batches) {
            b = this.cached_batches.get(sender);
            if (b == null) {
                b = this.cached_batches.computeIfAbsent(sender, __ -> new MessageBatch(cap).dest(null).sender(sender).cluster(cl).mcast(true));
            }
        } else {
            b = new MessageBatch(cap).dest(null).sender(sender).cluster(cl).mcast(true);
        }
        MessageBatch batch = b;
        Supplier<MessageBatch> batch_creator = () -> batch;
        MessageBatch mb = null;
        do {
            try {
                batch.reset();
                mb = win.removeMany(remove_msgs, this.max_batch_size, this.no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs, batch_creator, BATCH_ACCUMULATOR);
                batch.determineMode();
            }
            catch (Throwable t) {
                this.log.error("failed removing messages from table for " + sender, t);
            }
            int size = batch.size();
            if (size <= 0) continue;
            this.deliverBatch(batch, e);
            if (!this.stats) continue;
            this.avg_batch_size.add(size);
        } while (mb != null || adders.decrementAndGet() != 0);
    }

    protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender) {
        Object win;
        if (this.is_trace) {
            this.log.trace("%s <-- %s: XMIT(%s%s)", this.local_addr, xmit_requester, original_sender, missing_msgs);
        }
        if (this.stats) {
            this.xmit_reqs_received.add(missing_msgs.size());
        }
        if ((win = this.getBuf(original_sender)) == null) {
            this.log.error(Util.getMessage("SenderNotFound"), this.local_addr, original_sender);
            return;
        }
        if (this.is_trace) {
            this.log.trace("%s --> [all]: resending to %s %s", this.local_addr, original_sender, missing_msgs);
        }
        for (long i : missing_msgs) {
            Message msg = (Message)((Buffer)win).get(i);
            if (msg == null) {
                if (!this.log.isWarnEnabled() || !this.log_not_found_msgs || this.local_addr.equals(xmit_requester) || i <= ((Buffer)win).low()) continue;
                this.log.warn("%s: message %s::%d not found in retransmission table, requester: %s, win: %s", this.local_addr, original_sender, i, xmit_requester, win);
                continue;
            }
            this.sendXmitRsp(xmit_requester, msg);
        }
    }

    protected void deliver(Message msg, Address sender, long seqno, Entry entry, String error_msg) {
        if (this.is_trace) {
            this.log.trace("%s <-- %s: #%d", this.local_addr, sender, seqno);
        }
        try {
            if (this.needToSendAck(entry, 1)) {
                this.sendAck(sender, entry.buf());
            }
            this.up_prot.up(msg);
        }
        catch (Throwable t) {
            this.log.error(Util.getMessage("FailedToDeliverMsg"), this.local_addr, error_msg, msg, t);
        }
    }

    protected void deliverBatch(MessageBatch batch, Entry entry) {
        try {
            if (batch == null || batch.isEmpty()) {
                return;
            }
            if (this.is_trace) {
                Object first = batch.first();
                Object last = batch.last();
                StringBuilder sb = new StringBuilder(this.local_addr + " <-- " + batch.sender() + ": ");
                if (first != null && last != null) {
                    NakAckHeader hdr1 = (NakAckHeader)first.getHeader(this.id);
                    NakAckHeader hdr2 = (NakAckHeader)last.getHeader(this.id);
                    if (hdr1 != null && hdr2 != null) {
                        sb.append("#").append(hdr1.seqno).append("-").append(hdr2.seqno);
                    }
                }
                sb.append(" (" + batch.size()).append(" messages)");
                this.log.trace(sb);
            }
            if (this.needToSendAck(entry, batch.size())) {
                this.sendAck(batch.sender(), entry.buf());
            }
            this.up_prot.up(batch);
            batch.reset();
        }
        catch (Throwable t) {
            this.log.error(Util.getMessage("FailedToDeliverMsg"), this.local_addr, "batch", batch, t);
        }
    }

    protected void flushBecomeServerQueue() {
        if (!this.become_server_queue.isEmpty()) {
            Message msg;
            this.log.trace("%s: flushing become_server_queue (%d elements)", this.local_addr, this.become_server_queue.size());
            TP transport = this.getTransport();
            while ((msg = this.become_server_queue.poll()) != null) {
                transport.getThreadPool().execute(() -> this.up(msg));
            }
        }
    }

    protected void sendXmitRsp(Address dest, Message msg) {
        if (msg == null) {
            return;
        }
        if (this.stats) {
            this.xmit_rsps_sent.increment();
        }
        if (msg.getSrc() == null) {
            msg.setSrc(this.local_addr);
        }
        if (this.use_mcast_xmit) {
            msg.setFlag(Message.TransientFlag.DONT_BLOCK);
            this.resend(msg);
            return;
        }
        Message xmit_msg = msg.copy(true, true).setDest(dest).setFlag(Message.TransientFlag.DONT_BLOCK);
        NakAckHeader hdr = (NakAckHeader)xmit_msg.getHeader(this.id);
        NakAckHeader newhdr = hdr.copy();
        newhdr.type = (byte)3;
        xmit_msg.putHeader(this.id, newhdr);
        this.resend(xmit_msg);
    }

    protected void handleXmitRsp(Message msg, NakAckHeader hdr) {
        if (msg == null) {
            return;
        }
        try {
            if (this.stats) {
                this.xmit_rsps_received.increment();
            }
            msg.setDest(null);
            NakAckHeader newhdr = hdr.copy();
            newhdr.type = 1;
            msg.putHeader(this.id, newhdr);
            this.handleMessage(msg, newhdr);
        }
        catch (Exception ex) {
            this.log.error(Util.getMessage("FailedToDeliverMsg"), this.local_addr, "retransmitted message", msg, ex);
        }
    }

    protected void handleHighestSeqno(Address sender, long seqno) {
        Buffer<Message> win;
        Entry recv_entry = this.getEntry(sender);
        Buffer<Message> buffer = win = recv_entry != null ? recv_entry.buf() : null;
        if (win == null) {
            return;
        }
        long my_highest_received = win.high();
        if (my_highest_received >= 0L && seqno > my_highest_received) {
            this.log.trace("%s: my_highest_rcvd (%s#%d) < highest received (%s#%d): requesting retransmission", this.local_addr, sender, my_highest_received, sender, seqno);
            this.retransmit(seqno, seqno, sender, false);
        }
        this.needToSendAck(recv_entry, 1);
    }

    protected void handleAck(Address sender, long ack) {
    }

    protected boolean needToSendAck(Entry __) {
        return false;
    }

    protected boolean needToSendAck(Entry e, int num_acks) {
        return false;
    }

    protected void sendAck(Address to, Buffer<Message> win) {
    }

    protected Message msgFromXmitRsp(Message msg, NakAckHeader hdr) {
        if (msg == null) {
            return null;
        }
        if (this.stats) {
            this.xmit_rsps_received.increment();
        }
        msg.setDest(null);
        NakAckHeader newhdr = hdr.copy();
        newhdr.type = 1;
        msg.putHeader(this.id, newhdr);
        return msg;
    }

    protected void adjustReceivers(List<Address> members) {
        Set<Address> keys2 = this.xmit_table.keySet();
        for (Address member : keys2) {
            if (members.contains(member) || Objects.equals(this.local_addr, member) || this.xmit_table.remove(member) == null) continue;
            this.log.debug("%s: removed %s from xmit_table (not member anymore)", this.local_addr, member);
        }
        members.stream().filter(mbr -> !keys2.contains(mbr)).forEach(mbr -> this.xmit_table.putIfAbsent((Address)mbr, new Entry(this.createXmitWindow(0L))));
    }

    public Digest getDigest() {
        HashMap<Address, long[]> map = new HashMap<Address, long[]>();
        for (Map.Entry<Address, Entry> entry : this.xmit_table.entrySet()) {
            Address sender = entry.getKey();
            Buffer<Message> win = entry.getValue().buf();
            long[] seqnos = win.getDigest();
            map.put(sender, seqnos);
        }
        return new Digest(map);
    }

    public Digest getDigest(Address mbr) {
        if (mbr == null) {
            return this.getDigest();
        }
        Object win = this.getBuf(mbr);
        if (win == null) {
            return null;
        }
        long[] seqnos = ((Buffer)win).getDigest();
        return new Digest(mbr, seqnos[0], seqnos[1]);
    }

    protected void setDigest(Digest digest) {
        this.setDigest(digest, false);
    }

    protected void mergeDigest(Digest digest) {
        this.setDigest(digest, true);
    }

    protected void overwriteDigest(Digest digest) {
        if (digest == null) {
            return;
        }
        StringBuilder sb = new StringBuilder("\n[overwriteDigest()]\n");
        sb.append("existing digest:  " + this.getDigest()).append("\nnew digest:       " + digest);
        for (Digest.Entry entry : digest) {
            Address member = entry.getMember();
            if (member == null) continue;
            long highest_delivered_seqno = entry.getHighestDeliveredSeqno();
            Object win = this.getBuf(member);
            if (win != null) {
                if (this.local_addr.equals(member)) {
                    ((Buffer)win).highestDelivered(highest_delivered_seqno);
                    continue;
                }
                this.xmit_table.remove(member);
            }
            win = this.createXmitWindow(highest_delivered_seqno);
            this.xmit_table.put(member, new Entry((Buffer<Message>)win));
        }
        sb.append("\n").append("resulting digest: " + this.getDigest().toString(digest));
        this.digest_history.add(sb.toString());
        this.log.debug(sb.toString());
    }

    protected void setDigest(Digest digest, boolean merge) {
        if (digest == null) {
            return;
        }
        StringBuilder sb = this.log.isDebugEnabled() ? new StringBuilder("\n[" + this.local_addr + (merge ? " mergeDigest()]\n" : " setDigest()]\n")).append("existing digest:  " + this.getDigest()).append("\nnew digest:       " + digest) : null;
        boolean set_own_seqno = false;
        for (Digest.Entry entry : digest) {
            Address member = entry.getMember();
            if (member == null) continue;
            long highest_delivered_seqno = entry.getHighestDeliveredSeqno();
            Object win = this.getBuf(member);
            if (win != null) {
                if (!merge || Objects.equals(this.local_addr, member) || ((Buffer)win).highestDelivered() >= highest_delivered_seqno) continue;
                this.xmit_table.remove(member);
                if (member.equals(this.local_addr)) {
                    this.seqno.set(highest_delivered_seqno);
                    set_own_seqno = true;
                }
            }
            win = this.createXmitWindow(highest_delivered_seqno);
            this.xmit_table.put(member, new Entry((Buffer<Message>)win));
        }
        if (sb != null) {
            sb.append("\n").append("resulting digest: " + this.getDigest().toString(digest));
            if (set_own_seqno) {
                sb.append("\nnew seqno for " + this.local_addr + ": " + this.seqno);
            }
            this.digest_history.add(sb.toString());
            this.log.debug(sb.toString());
        }
    }

    protected void stable(Digest digest) {
        if (this.members == null || this.local_addr == null || digest == null) {
            return;
        }
        this.log.trace("%s: received stable digest %s", this.local_addr, digest);
        this.stability_msgs.add(digest.toString());
        for (Digest.Entry entry : digest) {
            Address member = entry.getMember();
            if (member == null) continue;
            long hd = entry.getHighestDeliveredSeqno();
            long hr = entry.getHighestReceivedSeqno();
            Object win = this.getBuf(member);
            if (win != null && this.xmit_interval > 0L) {
                long my_hr = ((Buffer)win).high();
                Long prev_hr = this.stable_xmit_map.get(member);
                if (prev_hr != null && prev_hr > my_hr) {
                    this.log.trace("%s: my_highest_rcvd (%d) < stability_highest_rcvd (%d): requesting retransmission of %s", this.local_addr, my_hr, prev_hr, member + "#" + prev_hr);
                    this.retransmit(prev_hr, prev_hr, member, false);
                }
                this.stable_xmit_map.put(member, hr);
            }
            if (hd < 0L || win == null) continue;
            this.log.trace("%s: deleting msgs <= %s from %s", this.local_addr, hd, member);
            ((Buffer)win).purge(hd);
        }
    }

    protected void retransmit(long first_seqno, long last_seqno, Address sender, boolean multicast_xmit_request) {
        if (first_seqno > last_seqno) {
            return;
        }
        SeqnoList list = new SeqnoList((int)(last_seqno - first_seqno + 1L), first_seqno).add(first_seqno, last_seqno);
        this.retransmit(list, sender, multicast_xmit_request);
    }

    protected void retransmit(SeqnoList missing_msgs, Address sender, boolean multicast_xmit_request) {
        Address random_member;
        Address dest;
        Address address = dest = multicast_xmit_request || this.use_mcast_xmit_req ? null : sender;
        if (this.xmit_from_random_member && !this.local_addr.equals(sender) && (random_member = Util.pickRandomElement(this.members)) != null && !this.local_addr.equals(random_member)) {
            dest = random_member;
        }
        Message retransmit_msg = new ObjectMessage(dest, missing_msgs).setFlag(Message.Flag.OOB, Message.Flag.NO_FC).setFlag(Message.TransientFlag.DONT_BLOCK).putHeader(this.id, NakAckHeader.createXmitRequestHeader(sender));
        if (this.is_trace) {
            this.log.trace("%s --> %s: XMIT_REQ(%s)", this.local_addr, dest, missing_msgs);
        }
        this.down_prot.down(retransmit_msg);
        if (this.stats) {
            this.xmit_reqs_sent.add(missing_msgs.size());
        }
    }

    protected void reset() {
        this.seqno.set(0L);
        this.xmit_table.clear();
    }

    protected static long sizeOfAllMessages(Buffer<Message> win, boolean include_headers) {
        return win.stream().filter(Objects::nonNull).map(m -> include_headers ? Long.valueOf(m.size()) : Long.valueOf(m.getLength())).reduce(0L, Long::sum);
    }

    protected void startRetransmitTask() {
        if (this.xmit_interval > 0L && (this.xmit_task == null || this.xmit_task.isDone())) {
            this.xmit_task = this.timer.scheduleWithFixedDelay(new RetransmitTask(), 0L, this.xmit_interval, TimeUnit.MILLISECONDS, this.sends_can_block);
        }
    }

    protected void stopRetransmitTask() {
        if (this.xmit_task != null) {
            this.xmit_task.cancel(true);
            this.xmit_task = null;
        }
    }

    @ManagedOperation(description="Triggers the retransmission task, asking all senders for missing messages")
    public void triggerXmit() {
        for (Map.Entry<Address, Entry> entry : this.xmit_table.entrySet()) {
            SeqnoList missing;
            Address target = entry.getKey();
            Entry val = entry.getValue();
            Buffer<Message> win = val.buf();
            if (this.needToSendAck(val)) {
                this.sendAck(target, win);
            }
            if (win != null && win.numMissing() > 0 && (missing = win.getMissing(this.max_xmit_req_size)) != null) {
                long highest = missing.getLast();
                Long prev_seqno = this.xmit_task_map.get(target);
                if (prev_seqno == null) {
                    this.xmit_task_map.put(target, highest);
                    continue;
                }
                missing.removeHigherThan(prev_seqno);
                if (highest > prev_seqno) {
                    this.xmit_task_map.put(target, highest);
                }
                if (missing.isEmpty()) continue;
                long highest_deliverable = win.getHighestDeliverable();
                long first = missing.getFirst();
                if (first < highest_deliverable) {
                    missing.removeLowerThan(highest_deliverable + 1L);
                }
                this.retransmit(missing, target, false);
                continue;
            }
            if (this.xmit_task_map.isEmpty()) continue;
            this.xmit_task_map.remove(target);
        }
        this.last_seqno_resender.execute();
    }

    protected static class Entry {
        protected final Buffer<Message> buf;
        protected final AtomicInteger num_acks_sent = new AtomicInteger();
        protected final AtomicBoolean send_ack = new AtomicBoolean();

        protected Entry(Buffer<Message> buf) {
            this.buf = Objects.requireNonNull(buf);
        }

        protected Buffer<Message> buf() {
            return this.buf;
        }

        protected Entry sendAck() {
            this.send_ack.compareAndSet(false, true);
            return this;
        }

        protected boolean needToSendAck() {
            return this.send_ack.compareAndSet(true, false);
        }

        public boolean update(int num_acks, IntBinaryOperator op) {
            boolean should_send_ack;
            boolean bl = should_send_ack = this.num_acks_sent.accumulateAndGet(num_acks, op) == 0;
            if (should_send_ack) {
                return true;
            }
            this.sendAck();
            return false;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.buf != null) {
                sb.append(this.buf).append(", ");
            }
            if (this.send_ack.get()) {
                sb.append(" [ack pending]");
            }
            return sb.toString();
        }
    }

    protected class LastSeqnoResender {
        protected final AtomicBoolean skip_next_send = new AtomicBoolean(false);

        protected LastSeqnoResender() {
        }

        protected void skipNext() {
            this.skip_next_send.compareAndSet(false, true);
        }

        protected void execute() {
            if (this.skip_next_send.compareAndSet(true, false)) {
                return;
            }
            Buffer<Message> send_buf = ReliableMulticast.this.sendBuf();
            if (send_buf == null) {
                return;
            }
            long low = send_buf.low();
            long highest_sent_seqno = send_buf.high();
            if (highest_sent_seqno == 0L || low == highest_sent_seqno) {
                return;
            }
            Message msg = new EmptyMessage(null).putHeader(ReliableMulticast.this.id, NakAckHeader.createHighestSeqnoHeader(highest_sent_seqno)).setFlag(Message.Flag.OOB, Message.Flag.NO_FC).setFlag(Message.TransientFlag.DONT_BLOCK);
            ReliableMulticast.this.down_prot.down(msg);
        }
    }

    protected class RetransmitTask
    implements Runnable {
        protected RetransmitTask() {
        }

        @Override
        public void run() {
            ReliableMulticast.this.triggerXmit();
        }

        public String toString() {
            return ReliableMulticast.class.getSimpleName() + ": RetransmitTask (interval=" + ReliableMulticast.this.xmit_interval + " ms)";
        }
    }
}

