/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.shaded.org.jgroups.protocols.pbcast;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.shaded.org.jgroups.Address;
import org.apache.activemq.artemis.shaded.org.jgroups.Event;
import org.apache.activemq.artemis.shaded.org.jgroups.Header;
import org.apache.activemq.artemis.shaded.org.jgroups.Message;
import org.apache.activemq.artemis.shaded.org.jgroups.View;
import org.apache.activemq.artemis.shaded.org.jgroups.ViewId;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.MBean;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.ManagedAttribute;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.ManagedOperation;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.Property;
import org.apache.activemq.artemis.shaded.org.jgroups.stack.Protocol;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Buffer;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Digest;
import org.apache.activemq.artemis.shaded.org.jgroups.util.FixedSizeBitSet;
import org.apache.activemq.artemis.shaded.org.jgroups.util.MessageBatch;
import org.apache.activemq.artemis.shaded.org.jgroups.util.MutableDigest;
import org.apache.activemq.artemis.shaded.org.jgroups.util.TimeScheduler;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Util;

@MBean(description="Computes the broadcast messages that are stable")
public class STABLE
extends Protocol {
    protected static final long MAX_SUSPEND_TIME = 200000L;
    @Property(description="Average time to send a STABLE message")
    protected long desired_avg_gossip = 20000L;
    @Property(description="Delay before stability message is sent")
    protected long stability_delay = 6000L;
    @Property(description="Maximum number of bytes received in all messages before sending a STABLE message is triggered")
    protected long max_bytes = 2000000L;
    @Property(description="Max percentage of the max heap (-Xmx) to be used for max_bytes. Only used if ergonomics is enabled. 0 disables setting max_bytes dynamically.", deprecatedMessage="will be ignored")
    @Deprecated
    protected double cap = 0.1;
    @Property(description="Wether or not to send the STABLE messages to all members of the cluster, or to the current coordinator only. The latter reduces the number of STABLE messages, but also generates more work on the coordinator")
    protected boolean send_stable_msgs_to_coord_only = true;
    protected int num_stable_msgs_sent;
    protected int num_stable_msgs_received;
    protected int num_stability_msgs_sent;
    protected int num_stability_msgs_received;
    protected Address local_addr;
    protected volatile View view;
    protected volatile MutableDigest digest;
    protected FixedSizeBitSet votes;
    protected final Lock lock = new ReentrantLock();
    protected Future<?> stability_task_future;
    protected final Lock stability_lock = new ReentrantLock();
    protected Future<?> stable_task_future = null;
    protected final Lock stable_task_lock = new ReentrantLock();
    protected TimeScheduler timer;
    @ManagedAttribute(description="Bytes accumulated so far")
    protected long num_bytes_received = 0L;
    protected final Lock received = new ReentrantLock();
    @ManagedAttribute
    protected volatile boolean suspended = false;
    protected boolean initialized = false;
    protected Future<?> resume_task_future;
    protected final Object resume_task_mutex = new Object();
    protected volatile Address coordinator;

    public long getDesiredAverageGossip() {
        return this.desired_avg_gossip;
    }

    public void setDesiredAverageGossip(long gossip_interval) {
        this.desired_avg_gossip = gossip_interval;
    }

    public long getMaxBytes() {
        return this.max_bytes;
    }

    public void setMaxBytes(long max_bytes) {
        this.max_bytes = max_bytes;
    }

    public long getBytes() {
        return this.num_bytes_received;
    }

    @ManagedAttribute
    public int getStableSent() {
        return this.num_stable_msgs_sent;
    }

    @ManagedAttribute
    public int getStableReceived() {
        return this.num_stable_msgs_received;
    }

    @ManagedAttribute
    public int getStabilitySent() {
        return this.num_stability_msgs_sent;
    }

    @ManagedAttribute
    public int getStabilityReceived() {
        return this.num_stability_msgs_received;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ManagedAttribute
    public boolean getStableTaskRunning() {
        this.stable_task_lock.lock();
        try {
            boolean bl = this.stable_task_future != null && !this.stable_task_future.isDone() && !this.stable_task_future.isCancelled();
            return bl;
        }
        finally {
            this.stable_task_lock.unlock();
        }
    }

    @Override
    public void resetStats() {
        super.resetStats();
        this.num_stable_msgs_received = 0;
        this.num_stable_msgs_sent = 0;
        this.num_stability_msgs_sent = 0;
        this.num_stability_msgs_received = 0;
    }

    @Override
    public List<Integer> requiredDownServices() {
        return Arrays.asList(39);
    }

    protected void suspend(long timeout) {
        if (!this.suspended) {
            this.suspended = true;
            this.log.debug("suspending message garbage collection");
        }
        this.startResumeTask(timeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void resume() {
        this.lock.lock();
        try {
            this.resetDigest();
            this.suspended = false;
        }
        finally {
            this.lock.unlock();
        }
        this.log.debug("resuming message garbage collection");
        this.stopResumeTask();
    }

    @Override
    public void init() throws Exception {
        super.init();
    }

    @Override
    public void start() throws Exception {
        this.timer = this.getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer cannot be retrieved");
        }
        if (this.desired_avg_gossip > 0L) {
            this.startStableTask();
        }
        if (this.send_stable_msgs_to_coord_only) {
            this.stability_delay = 0L;
        }
    }

    @Override
    public void stop() {
        this.stopStableTask();
    }

    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 1: {
                Message msg = (Message)evt.getArg();
                StableHeader hdr = (StableHeader)msg.getHeader(this.id);
                if (hdr == null) {
                    this.handleRegularMessage(msg);
                    return this.up_prot.up(evt);
                }
                this.handleUpEvent(hdr, msg.getSrc(), this.readDigest(msg.getRawBuffer(), msg.getOffset(), msg.getLength()));
                return null;
            }
            case 6: {
                Object retval = this.up_prot.up(evt);
                this.handleViewChange((View)evt.getArg());
                return retval;
            }
        }
        return this.up_prot.up(evt);
    }

    protected void handleUpEvent(StableHeader hdr, Address sender, Digest digest) {
        switch (hdr.type) {
            case 1: {
                this.handleStableMessage(digest, sender, hdr.view_id);
                break;
            }
            case 2: {
                this.handleStabilityMessage(digest, sender, hdr.view_id);
                break;
            }
            default: {
                this.log.error("%s: StableHeader type %s not known", this.local_addr, hdr.type);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void up(MessageBatch batch) {
        for (Message msg : batch) {
            StableHeader hdr = (StableHeader)msg.getHeader(this.id);
            if (hdr == null) continue;
            batch.remove(msg);
            this.handleUpEvent(hdr, batch.sender(), this.readDigest(msg.getRawBuffer(), msg.getOffset(), msg.getLength()));
        }
        if (this.max_bytes > 0L && batch.dest() == null && !batch.isEmpty()) {
            boolean send_stable_msg = false;
            this.received.lock();
            try {
                this.num_bytes_received += (long)batch.length();
                if (this.num_bytes_received >= this.max_bytes) {
                    this.log.trace("max_bytes has been reached (%s, bytes received=%s): triggers stable msg", this.max_bytes, this.num_bytes_received);
                    this.num_bytes_received = 0L;
                    send_stable_msg = true;
                }
            }
            finally {
                this.received.unlock();
            }
            if (send_stable_msg) {
                this.sendStableMessage(true);
            }
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleRegularMessage(Message msg) {
        if (this.max_bytes <= 0L) {
            return;
        }
        if (msg.getDest() == null) {
            boolean send_stable_msg = false;
            this.received.lock();
            try {
                this.num_bytes_received += (long)msg.getLength();
                if (this.num_bytes_received >= this.max_bytes) {
                    this.log.trace("max_bytes has been reached (%s, bytes received=%s): triggers stable msg", this.max_bytes, this.num_bytes_received);
                    this.num_bytes_received = 0L;
                    send_stable_msg = true;
                }
            }
            finally {
                this.received.unlock();
            }
            if (send_stable_msg) {
                this.sendStableMessage(true);
            }
        }
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 6: {
                Object retval = this.down_prot.down(evt);
                this.handleViewChange((View)evt.getArg());
                return retval;
            }
            case 65: {
                long timeout = 200000L;
                Object t = evt.getArg();
                if (t != null && t instanceof Long) {
                    timeout = (Long)t;
                }
                this.suspend(timeout);
                break;
            }
            case 66: {
                this.resume();
                break;
            }
            case 8: {
                this.local_addr = (Address)evt.getArg();
            }
        }
        return this.down_prot.down(evt);
    }

    @ManagedOperation(description="Sends a STABLE message; when every member has received a STABLE message from everybody else, a STABILITY message will be sent")
    public void gc() {
        this.sendStableMessage(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleViewChange(View v) {
        this.lock.lock();
        try {
            this.view = v;
            this.coordinator = v.getMembers().get(0);
            this.resetDigest();
            if (!this.initialized) {
                this.initialized = true;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    protected void updateLocalDigest(Digest d, Address sender) {
        StringBuilder sb = null;
        if (this.log.isTraceEnabled()) {
            sb = new StringBuilder().append(this.local_addr).append(": handling digest from ").append(sender).append(":\nmine:   ").append(this.printDigest(this.digest)).append("\nother:  ").append(this.printDigest(d));
        }
        for (Digest.Entry entry : d) {
            Address mbr = entry.getMember();
            long hd = entry.getHighestDeliveredSeqno();
            long hr = entry.getHighestReceivedSeqno();
            long[] seqnos = this.digest.get(mbr);
            if (seqnos == null) continue;
            long my_hd = seqnos[0];
            long my_hr = seqnos[1];
            if (my_hd == -1L) {
                my_hd = hd;
            }
            long new_hd = Math.min(my_hd, hd);
            long new_hr = Math.max(my_hr, hr);
            this.digest.set(mbr, new_hd, new_hr);
        }
        if (sb != null) {
            this.log.trace(sb.append("\nresult: ").append(this.printDigest(this.digest)).append("\n"));
        }
    }

    protected void resetDigest() {
        if (this.view == null) {
            return;
        }
        this.digest = new MutableDigest(this.view.getMembersRaw());
        this.log.trace("%s: reset digest to %s", this.local_addr, this.printDigest(this.digest));
        this.votes = new FixedSizeBitSet(this.view.size());
    }

    protected boolean addVote(int rank) {
        try {
            return this.votes.set(rank) && STABLE.allVotesReceived(this.votes);
        }
        catch (Throwable t) {
            return false;
        }
    }

    protected static boolean allVotesReceived(FixedSizeBitSet votes) {
        return votes.cardinality() == votes.size();
    }

    protected static int getRank(Address member, View v) {
        if (v == null || member == null) {
            return -1;
        }
        Address[] members = v.getMembersRaw();
        for (int i = 0; i < members.length; ++i) {
            if (!member.equals(members[i])) continue;
            return i;
        }
        return -1;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startStableTask() {
        this.stable_task_lock.lock();
        try {
            if (this.stable_task_future == null || this.stable_task_future.isDone()) {
                StableTask stable_task = new StableTask();
                this.stable_task_future = this.timer.scheduleWithDynamicInterval(stable_task);
                this.log.trace("%s: stable task started", this.local_addr);
            }
        }
        finally {
            this.stable_task_lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void stopStableTask() {
        this.stable_task_lock.lock();
        try {
            if (this.stable_task_future != null) {
                this.stable_task_future.cancel(false);
                this.stable_task_future = null;
            }
        }
        finally {
            this.stable_task_lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startResumeTask(long max_suspend_time) {
        if ((max_suspend_time = (long)((double)max_suspend_time * 1.1)) <= 0L) {
            max_suspend_time = 200000L;
        }
        Object object = this.resume_task_mutex;
        synchronized (object) {
            if (this.resume_task_future == null || this.resume_task_future.isDone()) {
                ResumeTask resume_task = new ResumeTask();
                this.resume_task_future = this.timer.schedule(resume_task, max_suspend_time, TimeUnit.MILLISECONDS);
                this.log.debug("%s: resume task started, max_suspend_time=%d", this.local_addr, max_suspend_time);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void stopResumeTask() {
        Object object = this.resume_task_mutex;
        synchronized (object) {
            if (this.resume_task_future != null) {
                this.resume_task_future.cancel(false);
                this.resume_task_future = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startStabilityTask(Digest d, ViewId view_id, long delay) {
        this.stability_lock.lock();
        try {
            if (this.stability_task_future == null || this.stability_task_future.isDone()) {
                StabilitySendTask stability_task = new StabilitySendTask(d, view_id);
                this.stability_task_future = this.timer.schedule(stability_task, delay, TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.stability_lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void stopStabilityTask() {
        this.stability_lock.lock();
        try {
            if (this.stability_task_future != null) {
                this.stability_task_future.cancel(false);
                this.stability_task_future = null;
            }
        }
        finally {
            this.stability_lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleStableMessage(Digest d, Address sender, ViewId view_id) {
        if (d == null || sender == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error(Util.getMessage("DigestOrSenderIsNull"));
            }
            return;
        }
        if (!this.initialized || this.suspended) {
            this.log.trace("%s: STABLE message is ignored: initialized=%b, suspended=%b", this.local_addr, this.initialized, this.suspended);
            return;
        }
        if (!view_id.equals(this.view.getViewId())) {
            this.log.trace("%s: discarded STABLE message with different view-id %s (my view-id=%s)", this.local_addr, view_id, this.view.getViewId());
            return;
        }
        MutableDigest stable_digest = null;
        ViewId stable_view_id = null;
        this.lock.lock();
        try {
            int rank = STABLE.getRank(sender, this.view);
            if (rank < 0 || this.votes.get(rank)) {
                return;
            }
            ++this.num_stable_msgs_received;
            this.updateLocalDigest(d, sender);
            if (this.addVote(rank)) {
                stable_digest = this.digest;
                stable_view_id = this.view.getViewId();
                this.resetDigest();
            }
        }
        catch (Throwable t) {
            return;
        }
        finally {
            this.lock.unlock();
        }
        if (stable_digest != null) {
            this.resetNumBytes();
            this.sendStabilityMessage(stable_digest, stable_view_id);
            this.down_prot.down(new Event(30, stable_digest));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void resetNumBytes() {
        this.received.lock();
        try {
            this.num_bytes_received = 0L;
        }
        finally {
            this.received.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleStabilityMessage(Digest stable_digest, Address sender, ViewId view_id) {
        if (stable_digest == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error(Util.getMessage("StabilityDigestIsNull"));
            }
            return;
        }
        if (!this.initialized || this.suspended) {
            this.log.trace("%s: STABLE message is ignored: initialized=%b, suspended=%b", this.local_addr, this.initialized, this.suspended);
            return;
        }
        if (this.local_addr != null && this.local_addr.equals(sender)) {
            ++this.num_stability_msgs_received;
            return;
        }
        this.stopStabilityTask();
        this.lock.lock();
        try {
            if (!view_id.equals(this.view.getViewId())) {
                this.log.trace("%s: discarded STABILITY message with different view-id %s (my view-id=%s)", this.local_addr, view_id, this.view);
                return;
            }
            this.log.trace("%s: received stability msg from %s: %s", this.local_addr, sender, this.printDigest(stable_digest));
            ++this.num_stability_msgs_received;
            this.resetDigest();
        }
        finally {
            this.lock.unlock();
        }
        this.resetNumBytes();
        this.down_prot.down(new Event(30, stable_digest));
    }

    protected void sendStableMessage(boolean send_in_background) {
        Address dest;
        if (this.suspended || this.view == null) {
            return;
        }
        View current_view = this.view;
        MutableDigest d = new MutableDigest(current_view.getMembersRaw()).set(this.getDigest());
        Address address = dest = this.send_stable_msgs_to_coord_only ? this.coordinator : null;
        if (!d.allSet() && !d.set(this.getDigest()).allSet()) {
            this.log.trace("%s: could not find matching digest for view %s, missing members: %s", this.local_addr, current_view, d.getNonSetMembers());
            return;
        }
        this.log.trace("%s: sending stable msg to %s: %s", this.local_addr, this.send_stable_msgs_to_coord_only ? this.coordinator : "cluster", this.printDigest(d));
        final Message msg = new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.NO_RELIABILITY).putHeader(this.id, new StableHeader(1, current_view.getViewId())).setBuffer(STABLE.marshal(d));
        try {
            if (!send_in_background) {
                this.down_prot.down(new Event(1, msg));
                return;
            }
            Runnable r = new Runnable(){

                @Override
                public void run() {
                    STABLE.this.down_prot.down(new Event(1, msg));
                    ++STABLE.this.num_stable_msgs_sent;
                }

                public String toString() {
                    return STABLE.class.getSimpleName() + ": STABLE-GOSSIP";
                }
            };
            this.timer.execute(r);
        }
        catch (Throwable t) {
            this.log.warn("failed sending STABLE message", t);
        }
    }

    public static Buffer marshal(Digest digest) {
        return Util.streamableToBuffer(digest);
    }

    protected Digest readDigest(byte[] buffer, int offset, int length) {
        try {
            return buffer != null ? Util.streamableFromBuffer(Digest.class, buffer, offset, length) : null;
        }
        catch (Exception ex) {
            this.log.error("%s: failed reading Digest from message: %s", this.local_addr, ex);
            return null;
        }
    }

    protected void sendStabilityMessage(Digest tmp, ViewId view_id) {
        this.startStabilityTask(tmp, view_id, Util.random(this.stability_delay));
    }

    protected Digest getDigest() {
        return (Digest)this.down_prot.down(Event.GET_DIGEST_EVT);
    }

    protected String printDigest(Digest digest) {
        if (digest == null) {
            return null;
        }
        return this.view != null ? digest.toString(this.view.getMembersRaw(), false) : digest.toString();
    }

    protected class ResumeTask
    implements Runnable {
        protected ResumeTask() {
        }

        @Override
        public void run() {
            if (STABLE.this.suspended) {
                STABLE.this.log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; check why this event was not received (or increase max_suspend_time for large state transfers)");
            }
            STABLE.this.resume();
        }

        public String toString() {
            return STABLE.class.getSimpleName() + ": ResumeTask";
        }
    }

    protected class StabilitySendTask
    implements Runnable {
        protected final Digest stability_digest;
        protected final ViewId view_id;

        protected StabilitySendTask(Digest d, ViewId view_id) {
            this.stability_digest = d;
            this.view_id = view_id;
        }

        @Override
        public void run() {
            if (STABLE.this.suspended) {
                STABLE.this.log.debug("STABILITY message will not be sent as suspended=%s", STABLE.this.suspended);
                return;
            }
            try {
                Message msg = new Message().setFlag(Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.NO_RELIABILITY).putHeader(STABLE.this.id, new StableHeader(2, this.view_id)).setBuffer(STABLE.marshal(this.stability_digest));
                STABLE.this.log.trace("%s: sending stability msg %s", STABLE.this.local_addr, STABLE.this.printDigest(this.stability_digest));
                ++STABLE.this.num_stability_msgs_sent;
                STABLE.this.down_prot.down(new Event(1, msg));
            }
            catch (Exception e) {
                STABLE.this.log.warn("failed sending STABILITY message", e);
            }
        }

        public String toString() {
            return STABLE.class.getSimpleName() + ": StabilityTask";
        }
    }

    protected class StableTask
    implements TimeScheduler.Task {
        protected StableTask() {
        }

        @Override
        public long nextInterval() {
            long interval = this.computeSleepTime();
            return interval <= 0L ? STABLE.this.desired_avg_gossip / 2L : interval;
        }

        @Override
        public void run() {
            if (STABLE.this.suspended) {
                STABLE.this.log.trace("%s: stable task will not run as suspended=true", STABLE.this.local_addr);
                return;
            }
            STABLE.this.sendStableMessage(false);
        }

        public String toString() {
            return STABLE.class.getSimpleName() + ": StableTask";
        }

        long computeSleepTime() {
            return this.getRandom(STABLE.this.desired_avg_gossip * 2L);
        }

        long getRandom(long range) {
            return (long)(Math.random() * (double)range % (double)range);
        }
    }

    public static class StableHeader
    extends Header {
        public static final byte STABLE_GOSSIP = 1;
        public static final byte STABILITY = 2;
        protected byte type;
        protected ViewId view_id;

        public StableHeader() {
        }

        public StableHeader(byte type, ViewId view_id) {
            this.type = type;
            this.view_id = view_id;
        }

        static String type2String(byte t) {
            switch (t) {
                case 1: {
                    return "STABLE_GOSSIP";
                }
                case 2: {
                    return "STABILITY";
                }
            }
            return "<unknown>";
        }

        @Override
        public String toString() {
            return String.format("[%s] view-id= %s", StableHeader.type2String(this.type), this.view_id);
        }

        @Override
        public int size() {
            return 1 + Util.size(this.view_id);
        }

        @Override
        public void writeTo(DataOutput out) throws Exception {
            out.writeByte(this.type);
            Util.writeViewId(this.view_id, out);
        }

        @Override
        public void readFrom(DataInput in) throws Exception {
            this.type = in.readByte();
            this.view_id = Util.readViewId(in);
        }
    }
}

