/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests.perf;

import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jgroups.Address;
import org.jgroups.BaseMessage;
import org.jgroups.BytesMessage;
import org.jgroups.EmptyMessage;
import org.jgroups.Header;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ObjectMessage;
import org.jgroups.Receiver;
import org.jgroups.Version;
import org.jgroups.View;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.util.Bits;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.ResponseCollector;
import org.jgroups.util.Streamable;
import org.jgroups.util.ThreadFactory;
import org.jgroups.util.Util;

public class MPerf
implements Receiver {
    protected JChannel channel;
    protected Address local_addr;
    protected int time = 60;
    protected int msg_size = 1000;
    protected int num_threads = 100;
    protected int num_senders = -1;
    protected boolean oob;
    protected boolean log_local = true;
    protected boolean display_msg_src = false;
    protected MessageCounter received_msgs_map = new MessageCounter();
    protected final List<Address> members = new CopyOnWriteArrayList<Address>();
    protected final Log log = LogFactory.getLog(this.getClass());
    protected Path out_file_path;
    protected boolean looping = true;
    protected long sleep;
    protected final ResponseCollector<Result> results = new ResponseCollector();
    protected ThreadFactory thread_factory;
    protected static final short ID = ClassConfigurator.getProtocolId(MPerf.class);

    public MPerf() throws IOException {
        this(null);
    }

    public MPerf(Path out_file_path) throws IOException {
        if (out_file_path != null) {
            if (Files.notExists(out_file_path, new LinkOption[0])) {
                Files.createDirectories(out_file_path.getParent(), new FileAttribute[0]);
                Files.createFile(out_file_path, new FileAttribute[0]);
            }
            this.out_file_path = out_file_path;
        }
    }

    public MPerf time(int t) {
        this.time = t;
        return this;
    }

    public MPerf size(int s) {
        this.msg_size = s;
        return this;
    }

    public MPerf threads(int t) {
        this.num_threads = t;
        return this;
    }

    public void start(String props, String name, boolean use_virtual_threads) throws Exception {
        StringBuilder sb = new StringBuilder();
        sb.append("\n\n----------------------- MPerf -----------------------\n");
        sb.append("Date: ").append(new Date()).append('\n');
        sb.append("Run by: ").append(System.getProperty("user.name")).append("\n");
        sb.append("JGroups version: ").append(Version.description).append('\n');
        System.out.println(sb);
        this.thread_factory = new DefaultThreadFactory("invoker", false, true).useVirtualThreads(use_virtual_threads);
        if (use_virtual_threads && Util.virtualThreadsAvailable()) {
            System.out.println("-- MPerf: using virtual threads");
        }
        this.channel = new JChannel(props).setName(name).setReceiver(this).connect("mperf");
        this.local_addr = this.channel.getAddress();
        Address coord = this.channel.getView().getCoord();
        if (coord != null && !this.local_addr.equals(coord)) {
            this.send(coord, null, (byte)7, Message.Flag.RSVP);
        }
    }

    public void setOutPath(Path out_file_path) {
        this.out_file_path = out_file_path;
    }

    protected void eventLoop() {
        String INPUT = "[1] Start test [2] View [4] Threads (%d) [6] Time (%,ds) [7] Msg size (%s)\n[8] Number of senders (%s) [o] Toggle OOB (%s) [l] Toggle measure local messages (%s)\n[s] Display message sources (%s) [9] sleep (%d ms)\n[x] Exit this [X] Exit all";
        while (this.looping) {
            try {
                int c = Util.keyPress(String.format("[1] Start test [2] View [4] Threads (%d) [6] Time (%,ds) [7] Msg size (%s)\n[8] Number of senders (%s) [o] Toggle OOB (%s) [l] Toggle measure local messages (%s)\n[s] Display message sources (%s) [9] sleep (%d ms)\n[x] Exit this [X] Exit all", this.num_threads, this.time, Util.printBytes(this.msg_size), this.num_senders <= 0 ? "all" : String.valueOf(this.num_senders), this.oob, this.log_local, this.display_msg_src, this.sleep));
                switch (c) {
                    case 49: {
                        this.startTest();
                        break;
                    }
                    case 50: {
                        System.out.println("view: " + this.channel.getView() + " (local address=" + this.channel.getAddress() + ")");
                        break;
                    }
                    case 52: {
                        this.configChange("num_threads");
                        break;
                    }
                    case 54: {
                        this.configChange("time");
                        break;
                    }
                    case 55: {
                        this.configChange("msg_size");
                        break;
                    }
                    case 56: {
                        this.configChange("num_senders");
                        break;
                    }
                    case 57: {
                        this.configChange("sleep");
                        break;
                    }
                    case 111: {
                        ConfigChange change = new ConfigChange("oob", !this.oob);
                        this.send(null, change, (byte)6, Message.Flag.RSVP);
                        break;
                    }
                    case 108: {
                        ConfigChange log_local_change = new ConfigChange("log_local", !this.log_local);
                        this.send(null, log_local_change, (byte)6, Message.Flag.RSVP);
                        break;
                    }
                    case 115: {
                        ConfigChange display_msg_src_change = new ConfigChange("display_msg_src", !this.display_msg_src);
                        this.send(null, display_msg_src_change, (byte)6, Message.Flag.RSVP);
                        break;
                    }
                    case -1: 
                    case 120: {
                        this.looping = false;
                        break;
                    }
                    case 88: {
                        this.send(null, null, (byte)9, Message.Flag.OOB);
                    }
                }
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        }
        this.stop();
    }

    protected void startTest() throws Exception {
        this.results.reset(new ArrayList<Address>(this.members));
        this.send(null, null, (byte)2, Message.Flag.OOB);
        this.results.waitForAllResponses((long)this.time * 1000L * 2L);
        this.displayResults();
    }

    protected void displayResults() {
        this.printOutput("\nResults:\n");
        this.printOutput("view: " + this.channel.getView() + " (local address=" + this.channel.getAddress() + ")");
        this.printOutput(this.printParameters());
        Map<Address, Result> tmp_results = this.results.getResults();
        for (Map.Entry<Address, Result> entry : tmp_results.entrySet()) {
            Result val = entry.getValue();
            if (val == null) continue;
            String resultString = entry.getKey() + ": " + MPerf.computeStats(val.time, val.msgs, this.msg_size);
            if (this.display_msg_src) {
                resultString = resultString + MPerf.printPerSender(val.sources, val.received, this.msg_size, val.time);
            }
            System.out.println(resultString);
        }
        long total_msgs = 0L;
        long total_time = 0L;
        long num = 0L;
        for (Result result : tmp_results.values()) {
            if (result == null) continue;
            total_time += result.time;
            total_msgs += result.msgs;
            ++num;
        }
        if (num > 0L) {
            this.printOutput("\n===============================================================================");
            this.printOutput(" Average/node:    " + MPerf.computeStats(total_time / num, total_msgs / num, this.msg_size));
            this.printOutput(" Average/cluster: " + MPerf.computeStats(total_time / num, total_msgs, this.msg_size));
            this.printOutput("================================================================================\n\n");
        } else {
            this.printOutput("\n===============================================================================");
            this.printOutput(" Received no results");
            this.printOutput("================================================================================\n\n");
        }
    }

    protected String printParameters() {
        StringBuilder sb = new StringBuilder();
        sb.append("Date: ").append(new Date()).append('\n');
        sb.append("time=").append(this.time).append('\n');
        sb.append("msg_size=").append(this.msg_size).append('\n');
        sb.append("num_threads=").append(this.num_threads).append('\n');
        sb.append("num_senders=").append(this.num_senders).append('\n');
        sb.append("oob=").append(this.oob).append('\n');
        sb.append("log_local=").append(this.log_local).append('\n');
        sb.append("sleep=").append(this.sleep).append('\n');
        sb.append("display_msg_src=").append(this.display_msg_src).append('\n');
        return sb.toString();
    }

    protected void printOutput(String s) {
        System.out.println(s);
        if (this.out_file_path != null && Files.isWritable(this.out_file_path)) {
            try {
                Files.writeString(this.out_file_path, (CharSequence)(s + "\n"), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    protected void configChange(String name) throws Exception {
        int tmp = Util.readIntFromStdin(name + ": ");
        ConfigChange change = new ConfigChange(name, tmp);
        this.send(null, change, (byte)6, Message.Flag.RSVP);
    }

    protected void sendNoException(Address target, Object payload, byte header, Message.Flag ... flags) {
        Message msg = new ObjectMessage(target, payload).setFlag(flags);
        if (header > 0) {
            msg.putHeader(ID, new MPerfHeader(header));
        }
        try {
            this.channel.send(msg);
        }
        catch (Exception e) {
            this.log.error("%s: failed sending message to %s: %s", this.local_addr, target, e);
        }
    }

    protected void send(Address target, Object payload, byte header, Message.Flag ... flags) throws Exception {
        BaseMessage msg;
        BaseMessage baseMessage = msg = payload == null ? new EmptyMessage(target) : new ObjectMessage(target, payload);
        if (flags != null) {
            for (Message.Flag flag : flags) {
                msg.setFlag(flag);
            }
        }
        if (header > 0) {
            msg.putHeader(ID, new MPerfHeader(header));
        }
        this.channel.send(msg);
    }

    public void stop() {
        this.looping = false;
        Util.close((Closeable)this.channel);
    }

    @Override
    public void receive(Message msg) {
        MPerfHeader hdr = (MPerfHeader)msg.getHeader(ID);
        switch (hdr.type) {
            case 1: {
                if (!this.log_local && Objects.equals(msg.getSrc(), this.local_addr)) break;
                this.received_msgs_map.addMessage(msg.getSrc());
                if (this.sleep <= 0L) break;
                Util.sleep(this.sleep);
                break;
            }
            case 2: {
                int my_rank;
                boolean isSender = true;
                if (this.num_senders > 0 && (my_rank = Util.getRank(this.members, this.local_addr)) >= 0 && my_rank > this.num_senders) {
                    isSender = false;
                }
                boolean is_sender = isSender;
                CompletableFuture.supplyAsync(() -> {
                    Result r = this.sendMessages(is_sender);
                    System.out.println("-- done");
                    return r;
                }).thenAccept(r -> this.sendNoException(msg.src(), r, (byte)4, Message.Flag.OOB));
                break;
            }
            case 4: {
                Result res = (Result)msg.getObject();
                this.results.add(msg.getSrc(), res);
                break;
            }
            case 6: {
                this.handleConfigChange((ConfigChange)msg.getObject());
                break;
            }
            case 7: {
                try {
                    this.handleConfigRequest(msg.getSrc());
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                break;
            }
            case 8: {
                this.handleConfigResponse((Configuration)msg.getObject());
                break;
            }
            case 9: {
                Util.close((Closeable)this.channel);
                System.exit(0);
                break;
            }
            default: {
                System.err.println("header type " + hdr.type + " not recognized");
            }
        }
    }

    @Override
    public void receive(MessageBatch batch) {
        for (Message msg : batch) {
            byte type = ((MPerfHeader)msg.getHeader((short)MPerf.ID)).type;
            if (type == 1) {
                if (!this.log_local && Objects.equals(msg.getSrc(), this.local_addr)) continue;
                this.received_msgs_map.addMessage(msg.getSrc());
                if (this.sleep <= 0L) continue;
                Util.sleep(this.sleep);
                continue;
            }
            this.receive(msg);
        }
    }

    protected List<Address> getSenders() {
        if (this.num_senders <= 0) {
            return new ArrayList<Address>(this.members);
        }
        ArrayList<Address> retval = new ArrayList<Address>();
        for (int i = 0; i < this.num_senders; ++i) {
            retval.add(this.members.get(i));
        }
        return retval;
    }

    protected void handleConfigChange(ConfigChange config_change) {
        String attr_name = config_change.attr_name;
        try {
            Object attr_value = config_change.getValue();
            Field field = Util.getField(this.getClass(), attr_name);
            Util.setField(field, this, attr_value);
            System.out.println(config_change.attr_name + "=" + attr_value);
        }
        catch (Exception e) {
            System.err.println("failed applying config change for attr " + attr_name + ": " + e);
        }
    }

    protected void handleConfigRequest(Address sender) throws Exception {
        Configuration cfg = new Configuration();
        cfg.addChange("time", this.time);
        cfg.addChange("msg_size", this.msg_size);
        cfg.addChange("num_threads", this.num_threads);
        cfg.addChange("num_senders", this.num_senders);
        cfg.addChange("oob", this.oob);
        cfg.addChange("log_local", this.log_local);
        cfg.addChange("sleep", this.sleep);
        this.send(sender, cfg, (byte)8, new Message.Flag[0]);
    }

    protected void handleConfigResponse(Configuration cfg) {
        cfg.changes.forEach(this::handleConfigChange);
    }

    @Override
    public void viewAccepted(View view) {
        System.out.println("** " + view);
        List<Address> mbrs = view.getMembers();
        this.members.clear();
        this.members.addAll(mbrs);
        this.results.retainAll(mbrs);
    }

    protected Result sendMessages(boolean isSender) {
        Thread[] senders = new Thread[this.num_threads];
        CountDownLatch latch = new CountDownLatch(1);
        byte[] payload = new byte[this.msg_size];
        AtomicBoolean running = new AtomicBoolean(true);
        this.received_msgs_map.reset();
        if (isSender) {
            for (int i = 0; i < this.num_threads; ++i) {
                Sender sender = new Sender(latch, running, payload);
                senders[i] = this.thread_factory.newThread(sender, "sender-" + i);
                senders[i].start();
            }
        }
        System.out.printf("-- running test for %d seconds with %d sender threads\n", this.time, isSender ? this.num_threads : 0);
        long interval = (long)((double)this.time * 1000.0 / 10.0);
        long start = System.currentTimeMillis();
        latch.countDown();
        for (int i = 1; i <= 10; ++i) {
            Util.sleep(interval);
            System.out.printf("%d: %s\n", i, this.received_msgs_map.printAverage(start, this.msg_size, this.display_msg_src));
        }
        running.set(false);
        if (isSender) {
            running.set(false);
            for (Thread s : senders) {
                try {
                    s.join();
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        Map<Address, Long> counts = this.received_msgs_map.snapshot();
        Result result = new Result(System.currentTimeMillis() - start, this.received_msgs_map.totalCount(), counts.keySet().toArray(new Address[0]), Arrays.stream(counts.values().toArray(new Long[0])).filter(Objects::nonNull).mapToLong(Long::longValue).toArray());
        this.received_msgs_map.reset();
        return result;
    }

    protected String printAverage(long start_time) {
        long diff = System.currentTimeMillis() - start_time;
        double msgs_per_sec = (double)this.received_msgs_map.totalCount() / ((double)diff / 1000.0);
        double throughput = msgs_per_sec * (double)this.msg_size;
        return String.format("%,.2f msgs/sec (%s/sec)", msgs_per_sec, Util.printBytes(throughput));
    }

    protected static String computeStats(long time, long msgs, int size) {
        double throughput = 0.0;
        double msgs_sec = (double)msgs / ((double)time / 1000.0);
        throughput = (double)(msgs * (long)size) / ((double)time / 1000.0);
        return String.format("%,d msgs, %s received, time=%,d ms, msgs/sec=%,.2f, throughput=%s", msgs, Util.printBytes(msgs * (long)size), time, msgs_sec, Util.printBytes(throughput));
    }

    protected static String printPerSender(Address[] sender, long[] received, long msg_size, long diff) {
        if (sender == null || sender.length == 0) {
            return "";
        }
        StringJoiner joiner = new StringJoiner("  ");
        for (int i = 0; i < sender.length; ++i) {
            double msg_rate = (double)received[i] / ((double)diff / 1000.0);
            joiner.add(String.format("%s: %,.2f msgs/sec (%s/sec)", sender[i], msg_rate, Util.printBytes(msg_rate * (double)msg_size)));
        }
        return String.format("[%s]", joiner);
    }

    public static void main(String[] args2) throws IOException {
        block13: {
            String props = null;
            String name = null;
            boolean run_event_loop = true;
            boolean use_virtual_threads = true;
            Path out_file_path = null;
            int time = 60;
            int size = 1000;
            int threads = 100;
            for (int i = 0; i < args2.length; ++i) {
                if ("-props".equals(args2[i])) {
                    props = args2[++i];
                    continue;
                }
                if ("-name".equals(args2[i])) {
                    name = args2[++i];
                    continue;
                }
                if ("-file".equals(args2[i])) {
                    out_file_path = Paths.get(args2[++i], new String[0]);
                    continue;
                }
                if ("-nohup".equals(args2[i])) {
                    run_event_loop = false;
                    continue;
                }
                if ("-use_virtual_threads".equals(args2[i])) {
                    use_virtual_threads = Boolean.parseBoolean(args2[++i]);
                    continue;
                }
                if ("-time".equals(args2[i])) {
                    time = Integer.parseInt(args2[++i]);
                    continue;
                }
                if ("-size".equals(args2[i])) {
                    size = Integer.parseInt(args2[++i]);
                    continue;
                }
                if ("-threads".equals(args2[i])) {
                    threads = Integer.parseInt(args2[++i]);
                    continue;
                }
                System.out.println("MPerf [-props <stack config>] [-name <logical name>] [-nohup] [-use_virtual_threads true|false] [-file <file path>] [-time <secs>] [-size <bytes>] [-threads <number of sender threads>]");
                return;
            }
            MPerf test = new MPerf(out_file_path).time(time).size(size).threads(threads);
            try {
                test.start(props, name, use_virtual_threads);
                if (run_event_loop) {
                    test.eventLoop();
                    break block13;
                }
                while (true) {
                    Util.sleep(60000L);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    protected static class MessageCounter {
        protected ConcurrentHashMap<Address, LongAdder> countMap = new ConcurrentHashMap();
        protected static final Function<Address, LongAdder> FUNC = k -> new LongAdder();

        public void addMessage(Address source2) {
            this.countMap.computeIfAbsent(source2, FUNC).increment();
        }

        public void reset() {
            this.countMap = new ConcurrentHashMap();
        }

        public long totalCount() {
            long total = 0L;
            for (LongAdder adder : this.countMap.values()) {
                total += adder.sum();
            }
            return total;
        }

        public double totalRate(long diff) {
            return (double)this.totalCount() / ((double)diff / 1000.0);
        }

        public Map<Address, Long> snapshot() {
            return this.countMap.entrySet().stream().map(e -> new AbstractMap.SimpleEntry<Address, Long>((Address)e.getKey(), ((LongAdder)e.getValue()).sum())).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue, Math::addExact, TreeMap::new));
        }

        public String printAverage(long start_time, int msg_size, boolean display_msg_src) {
            Map<Address, Long> snapshot = this.snapshot();
            long diff = System.currentTimeMillis() - start_time;
            double msgs_per_sec = (double)this.totalCount() / ((double)diff / 1000.0);
            double throughput = msgs_per_sec * (double)msg_size;
            StringBuilder sb = new StringBuilder();
            sb.append(String.format("%,.2f msgs/sec (%s/sec)", msgs_per_sec, Util.printBytes(throughput)));
            if (display_msg_src) {
                StringJoiner joiner = new StringJoiner("  ");
                for (Map.Entry<Address, Long> e : snapshot.entrySet()) {
                    double msg_rate = (double)e.getValue().longValue() / ((double)diff / 1000.0);
                    joiner.add(String.format("%s: %,.2f msgs/sec (%s/sec)", e.getKey(), msg_rate, Util.printBytes(msg_rate * (double)msg_size)));
                }
                sb.append(String.format("[%s]", joiner));
            }
            return sb.toString();
        }
    }

    protected static class MPerfHeader
    extends Header {
        protected static final byte DATA = 1;
        protected static final byte START_SENDING = 2;
        protected static final byte RESULT = 4;
        protected static final byte CONFIG_CHANGE = 6;
        protected static final byte CONFIG_REQ = 7;
        protected static final byte CONFIG_RSP = 8;
        protected static final byte EXIT = 9;
        protected byte type;

        public MPerfHeader() {
        }

        public MPerfHeader(byte type) {
            this.type = type;
        }

        @Override
        public short getMagicId() {
            return 77;
        }

        @Override
        public Supplier<? extends Header> create() {
            return MPerfHeader::new;
        }

        @Override
        public int serializedSize() {
            return 1;
        }

        @Override
        public void writeTo(DataOutput out) throws IOException {
            out.writeByte(this.type);
        }

        @Override
        public void readFrom(DataInput in) throws IOException {
            this.type = in.readByte();
        }

        @Override
        public String toString() {
            return MPerfHeader.typeToString(this.type);
        }

        protected static String typeToString(byte type) {
            switch (type) {
                case 1: {
                    return "DATA";
                }
                case 2: {
                    return "START_SENDING";
                }
                case 4: {
                    return "RESULT";
                }
                case 6: {
                    return "CONFIG_CHANGE";
                }
                case 7: {
                    return "CONFIG_REQ";
                }
                case 8: {
                    return "CONFIG_RSP";
                }
                case 9: {
                    return "EXIT";
                }
            }
            return "n/a";
        }
    }

    protected static class Result
    implements Streamable {
        protected long time;
        protected long msgs;
        protected Address[] sources;
        protected long[] received;

        public Result() {
        }

        public Result(long time, long msgs, Address[] sources, long[] received) {
            this.time = time;
            this.msgs = msgs;
            this.sources = sources;
            this.received = received;
        }

        public int size() {
            int sources_size = this.sources == null ? 0 : this.sources.length * (this.sources[0].serializedSize() + 8);
            return Bits.size(this.time) + Bits.size(this.msgs) + sources_size;
        }

        @Override
        public void writeTo(DataOutput out) throws IOException {
            Bits.writeLongCompressed(this.time, out);
            Bits.writeLongCompressed(this.msgs, out);
            int sources_length = this.sources == null ? 0 : this.sources.length;
            Bits.writeIntCompressed(sources_length, out);
            if (sources_length == 0) {
                return;
            }
            for (int i = 0; i < this.sources.length; ++i) {
                Util.writeAddress(this.sources[i], out);
                Bits.writeLongCompressed(this.received[i], out);
            }
        }

        @Override
        public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
            this.time = Bits.readLongCompressed(in);
            this.msgs = Bits.readLongCompressed(in);
            int sources_length = Bits.readIntCompressed(in);
            if (sources_length > 0) {
                this.sources = new Address[sources_length];
                this.received = new long[sources_length];
                for (int i = 0; i < sources_length; ++i) {
                    this.sources[i] = Util.readAddress(in);
                    this.received[i] = Bits.readLongCompressed(in);
                }
            }
        }

        public String toString() {
            return this.msgs + " in " + this.time + " ms";
        }
    }

    protected static class Configuration
    implements Streamable {
        protected List<ConfigChange> changes = new ArrayList<ConfigChange>();

        public Configuration addChange(String key, Object val) throws Exception {
            if (key != null && val != null) {
                this.changes.add(new ConfigChange(key, val));
            }
            return this;
        }

        public int size() {
            int retval = 4;
            for (ConfigChange change : this.changes) {
                retval += change.size();
            }
            return retval;
        }

        @Override
        public void writeTo(DataOutput out) throws IOException {
            out.writeInt(this.changes.size());
            for (ConfigChange change : this.changes) {
                change.writeTo(out);
            }
        }

        @Override
        public void readFrom(DataInput in) throws IOException {
            int len = in.readInt();
            for (int i = 0; i < len; ++i) {
                ConfigChange change = new ConfigChange();
                change.readFrom(in);
                this.changes.add(change);
            }
        }
    }

    protected static class ConfigChange
    implements Streamable {
        protected String attr_name;
        protected byte[] attr_value;

        public ConfigChange() {
        }

        public ConfigChange(String attr_name, Object val) throws Exception {
            this.attr_name = attr_name;
            this.attr_value = Util.objectToByteBuffer(val);
        }

        public Object getValue() throws Exception {
            return Util.objectFromByteBuffer(this.attr_value);
        }

        public int size() {
            return Util.size(this.attr_name) + Util.size(this.attr_value);
        }

        @Override
        public void writeTo(DataOutput out) throws IOException {
            Bits.writeString(this.attr_name, out);
            Util.writeByteBuffer(this.attr_value, out);
        }

        @Override
        public void readFrom(DataInput in) throws IOException {
            this.attr_name = Bits.readString(in);
            this.attr_value = Util.readByteBuffer(in);
        }
    }

    protected class Sender
    implements Runnable {
        protected final CountDownLatch latch;
        protected final AtomicBoolean running;
        protected final byte[] payload;

        protected Sender(CountDownLatch l, AtomicBoolean running, byte[] payload) {
            this.latch = l;
            this.running = running;
            this.payload = payload;
        }

        @Override
        public void run() {
            try {
                this.latch.await();
            }
            catch (Exception e) {
                e.printStackTrace();
                return;
            }
            while (this.running.get()) {
                try {
                    Message msg = new BytesMessage(null, this.payload).putHeader(ID, new MPerfHeader(1));
                    if (MPerf.this.oob) {
                        msg.setFlag(Message.Flag.OOB);
                    }
                    MPerf.this.channel.send(msg);
                }
                catch (Exception exception) {}
            }
        }
    }
}

