/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.io.Serializable;
import junit.framework.Assert;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.log4j.Logger;
import org.jgroups.Channel;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelException;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.TimeoutException;
import org.jgroups.View;
import org.jgroups.util.Util;

public class ChannelMono
extends TestCase {
    static final boolean DEBUG = false;
    private Channel channel = null;
    static Logger logger = Logger.getLogger((Class)(class$org$jgroups$tests$ChannelMono == null ? (class$org$jgroups$tests$ChannelMono = ChannelMono.class$("org.jgroups.tests.ChannelMono")) : class$org$jgroups$tests$ChannelMono));
    String channelName = "ChannelLg4jTest";
    String protocol = null;
    static /* synthetic */ Class class$org$jgroups$tests$ChannelMono;

    public ChannelMono(String Name_) {
        super(Name_);
    }

    public void setUp() {
        try {
            this.channel = new JChannel(this.getProtocol());
            this.channel.connect(this.channelName);
        }
        catch (ChannelException e) {
            logger.error((Object)"Channel init problem", (Throwable)e);
        }
    }

    public String getProtocol() {
        return this.protocol;
    }

    public void setProtocol(String proto) {
        this.protocol = proto;
    }

    public void tearDown() {
        this.channel.close();
        this.channel = null;
    }

    public void testChannel() {
        ChannelMono.assertNotNull((Object)this.channel);
        ChannelMono.assertTrue((boolean)this.channel.isOpen());
        ChannelMono.assertTrue((boolean)this.channel.isConnected());
    }

    public void testLargeInsertion() {
        int nitems = 10000;
        try {
            logger.info((Object)("Inserting " + nitems + " elements"));
            ReadItems mythread = new ReadItems(0, nitems);
            mythread.start();
            long start = System.currentTimeMillis();
            for (int i = 0; i < nitems; ++i) {
                this.channel.send(new Message(null, null, ("Msg #" + i).getBytes()));
            }
            mythread.join();
            long stop = System.currentTimeMillis();
            logger.info((Object)("Took " + (stop - start) + " msecs"));
            ChannelMono.assertEquals((int)nitems, (int)mythread.getNum_items());
            ChannelMono.assertFalse((boolean)mythread.isAlive());
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
            ChannelMono.assertTrue((boolean)false);
        }
    }

    public void testBarrierWithTimeOut() {
        int i;
        RemoveOneItemWithTimeout[] removers = new RemoveOneItemWithTimeout[10];
        int num_dead = 0;
        long timeout = 200L;
        for (int i2 = 0; i2 < removers.length; ++i2) {
            removers[i2] = new RemoveOneItemWithTimeout(i2, timeout);
            removers[i2].start();
        }
        Util.sleep(5000L);
        logger.info((Object)"-- adding element 99");
        try {
            this.channel.send(null, null, new Long(99L));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        logger.info((Object)"-- adding element 100");
        try {
            this.channel.send(null, null, new Long(100L));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        for (i = 0; i < removers.length; ++i) {
            logger.info((Object)("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelMono.assertEquals((int)2, (int)num_dead);
        this.channel.disconnect();
        Util.sleep(2000L);
        for (i = 0; i < removers.length; ++i) {
            try {
                logger.debug((Object)("Waiting for thread " + i + " to join"));
                removers[i].join();
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        num_dead = 0;
        for (i = 0; i < removers.length; ++i) {
            logger.info((Object)("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelMono.assertEquals((int)removers.length, (int)num_dead);
        for (i = 0; i < removers.length; ++i) {
            removers[i] = null;
        }
    }

    public void testMultipleWriterOneReader() {
        AddOneItem[] adders = new AddOneItem[10];
        int num_dead = 0;
        int num_items = 0;
        int items = 200;
        for (int i = 0; i < adders.length; ++i) {
            adders[i] = new AddOneItem(i, items);
            adders[i].start();
        }
        while (num_items < adders.length * items) {
            try {
                Object obj = this.channel.receive(0L);
                if (obj instanceof View) {
                    logger.info((Object)("--> NEW VIEW: " + obj));
                    continue;
                }
                if (!(obj instanceof Message)) continue;
                Message msg = (Message)obj;
                ++num_items;
                logger.debug((Object)("Received " + msg.getObject()));
            }
            catch (ChannelNotConnectedException conn) {
                logger.error((Object)"Problem", (Throwable)conn);
                ChannelMono.assertTrue((boolean)false);
                break;
            }
            catch (TimeoutException e) {
                logger.error((Object)"Main thread timed out but should'nt had...", (Throwable)e);
                ChannelMono.assertTrue((boolean)false);
                break;
            }
            catch (Exception e) {
                logger.error((Object)"Problem", (Throwable)e);
                break;
            }
        }
        ChannelMono.assertEquals((int)(adders.length * items), (int)num_items);
        Util.sleep(1000L);
        for (int i = 0; i < adders.length; ++i) {
            try {
                logger.debug((Object)("Waiting for thread " + i + " to join"));
                adders[i].join();
                logger.info((Object)("adder #" + i + " is " + (adders[i].isAlive() ? "alive" : "terminated")));
                if (!adders[i].isAlive()) {
                    ++num_dead;
                }
                adders[i] = null;
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        ChannelMono.assertEquals((int)adders.length, (int)num_dead);
    }

    public void testBarrier() {
        int i;
        int READER_ITEMS = 10;
        ReadItems[] removers = new ReadItems[10];
        int num_dead = 0;
        for (int i2 = 0; i2 < removers.length; ++i2) {
            removers[i2] = new ReadItems(i2, 1);
            removers[i2].start();
        }
        Util.sleep(1000L);
        logger.info((Object)"-- adding element 99");
        try {
            this.channel.send(null, null, (Serializable)"99".getBytes());
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        logger.info((Object)"-- adding element 100");
        try {
            this.channel.send(null, null, (Serializable)"100".getBytes());
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        for (i = 0; i < removers.length; ++i) {
            logger.info((Object)("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelMono.assertEquals((int)2, (int)num_dead);
        for (i = 0; i < removers.length; ++i) {
            if (!removers[i].isAlive()) continue;
            try {
                this.channel.send(null, null, (Serializable)"Closing Message".getBytes());
                continue;
            }
            catch (Exception ex) {
                logger.error((Object)"Problem", (Throwable)ex);
            }
        }
        Util.sleep(2000L);
        num_dead = 0;
        for (i = 0; i < removers.length; ++i) {
            try {
                removers[i].join(1000L);
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
            logger.info((Object)("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelMono.assertEquals((int)10, (int)num_dead);
    }

    public void testMultipleWriterMultipleReader() {
        int i;
        logger.info((Object)"start testMultipleWriterMultipleReader");
        int nWriters = 5;
        int nReaders = 5;
        Writer[] adders = new Writer[nWriters];
        Reader[] readers = new Reader[nReaders];
        boolean num_dead = false;
        boolean num_items = false;
        int[] writes = new int[nWriters];
        int[] reads = new int[nReaders];
        for (i = 0; i < readers.length; ++i) {
            readers[i] = new Reader(this.channel, i, reads);
            readers[i].start();
        }
        Util.sleep(2000L);
        for (i = 0; i < adders.length; ++i) {
            adders[i] = new Writer(this.channel, i, writes);
            adders[i].start();
        }
        Util.sleep(10000L);
        for (i = 0; i < adders.length; ++i) {
            adders[i].stopThread();
        }
        Util.sleep(1000L);
        for (i = 0; i < adders.length; ++i) {
            try {
                logger.debug((Object)("Waiting for Writer thread " + i + " to join"));
                adders[i].join(1000L);
                logger.info((Object)("adder #" + i + " is " + (adders[i].isAlive() ? "alive" : "terminated")));
                adders[i] = null;
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        Util.sleep(10000L);
        this.channel.close();
        boolean allStopped = true;
        do {
            allStopped = true;
            Util.sleep(2000L);
            for (int i2 = 0; i2 < readers.length; ++i2) {
                try {
                    logger.debug((Object)("Waiting for Reader thread " + i2 + " to join"));
                    readers[i2].stopThread();
                    readers[i2].join(1000L);
                    if (readers[i2].isAlive()) {
                        allStopped = false;
                        logger.info((Object)("reader #" + i2 + ' ' + reads[i2] + " read items"));
                    }
                    logger.info((Object)("reader #" + i2 + " is " + (readers[i2].isAlive() ? "alive" : "terminated")));
                    continue;
                }
                catch (InterruptedException e) {
                    logger.error((Object)"Thread joining() interrupted", (Throwable)e);
                }
            }
        } while (!allStopped);
        int total_writes = 0;
        for (int i3 = 0; i3 < writes.length; ++i3) {
            total_writes += writes[i3];
        }
        int total_reads = 0;
        for (int i4 = 0; i4 < reads.length; ++i4) {
            total_reads += reads[i4];
        }
        logger.info((Object)("Total writes:" + total_writes));
        logger.info((Object)("Total reads:" + total_reads));
        ChannelMono.assertEquals((int)total_writes, (int)total_reads);
        logger.info((Object)"end testMultipleWriterMultipleReader");
    }

    public static void main(String[] args) {
        String[] testCaseName = new String[]{(class$org$jgroups$tests$ChannelMono == null ? (class$org$jgroups$tests$ChannelMono = ChannelMono.class$("org.jgroups.tests.ChannelMono")) : class$org$jgroups$tests$ChannelMono).getName()};
        TestRunner.main((String[])testCaseName);
    }

    static /* synthetic */ Class class$(String x0) {
        try {
            return Class.forName(x0);
        }
        catch (ClassNotFoundException x1) {
            throw new NoClassDefFoundError(x1.getMessage());
        }
    }

    class Reader
    extends Thread {
        int rank;
        int num_reads = 0;
        int[] reads = null;
        boolean running = true;
        Channel channel = null;

        Reader(Channel channel, int i, int[] reads) {
            super("Reader thread #" + i);
            this.rank = i;
            this.reads = reads;
            this.setDaemon(true);
            this.channel = channel;
        }

        public void run() {
            Message msg = null;
            while (this.running) {
                try {
                    Object obj = this.channel.receive(0L);
                    if (obj instanceof View) {
                        logger.info((Object)("Reader thread #" + this.rank + ":--> NEW VIEW: " + obj));
                    } else if (obj instanceof Message) {
                        msg = (Message)obj;
                        Long retval = (Long)msg.getObject();
                        logger.debug((Object)("Reader thread #" + this.rank + ": received " + retval));
                        Assert.assertNotNull((Object)retval);
                    }
                }
                catch (ChannelNotConnectedException conn) {
                    logger.error((Object)("Reader thread #" + this.rank + ": problem"), (Throwable)conn);
                    this.running = false;
                }
                catch (TimeoutException e) {
                    logger.error((Object)("Reader thread #" + this.rank + ": channel time out but should'nt have..."), (Throwable)e);
                    this.running = false;
                }
                catch (ChannelClosedException e) {
                    this.running = false;
                }
                catch (Exception e) {
                    logger.error((Object)("Reader thread #" + this.rank + ": problem"), (Throwable)e);
                }
                this.reads[this.rank] = ++this.num_reads;
            }
        }

        void stopThread() {
            this.running = false;
        }
    }

    class Writer
    extends Thread {
        int rank = 0;
        int num_writes = 0;
        boolean running = true;
        int[] writes = null;
        Channel channel = null;

        Writer(Channel channel, int i, int[] writes) {
            super("Writer thread #" + i);
            this.rank = i;
            this.writes = writes;
            this.channel = channel;
            this.setDaemon(true);
        }

        public void run() {
            while (this.running) {
                try {
                    this.channel.send(null, null, new Long(System.currentTimeMillis()));
                    Util.sleepRandom(50L);
                    ++this.num_writes;
                }
                catch (ChannelException closed) {
                    this.running = false;
                }
                catch (Throwable t) {
                    logger.debug((Object)("ChannelTest.Writer.run(): exception=" + t), t);
                }
            }
            this.writes[this.rank] = this.num_writes;
        }

        void stopThread() {
            this.running = false;
        }
    }

    class RemoveOneItemWithTimeout
    extends Thread {
        Long retval = null;
        int rank = 0;
        long timeout = 0L;

        RemoveOneItemWithTimeout(int rank, long timeout) {
            super("RemoveOneItemWithTimeout thread #" + rank);
            this.rank = rank;
            this.timeout = timeout;
            this.setDaemon(true);
        }

        public void run() {
            boolean finished = false;
            while (!finished) {
                try {
                    Object obj = ChannelMono.this.channel.receive(this.timeout);
                    if (obj != null) {
                        if (obj instanceof View) {
                            logger.info((Object)("Thread #" + this.rank + ":--> NEW VIEW: " + obj));
                            continue;
                        }
                        if (!(obj instanceof Message)) continue;
                        Message msg = (Message)obj;
                        this.retval = (Long)msg.getObject();
                        finished = true;
                        logger.debug((Object)("Thread #" + this.rank + " received :" + this.retval));
                        continue;
                    }
                    logger.debug((Object)("Thread #" + this.rank + ": channel read NULL"));
                }
                catch (ChannelNotConnectedException conn) {
                    finished = true;
                }
                catch (TimeoutException e) {
                }
                catch (ChannelClosedException e) {
                    logger.debug((Object)("Thread #" + this.rank + ": channel closed"), (Throwable)e);
                    finished = true;
                }
                catch (Exception e) {
                    logger.error((Object)("Thread #" + this.rank + " problem"), (Throwable)e);
                    finished = true;
                }
            }
        }

        Long getRetval() {
            return this.retval;
        }
    }

    class AddOneItem
    extends Thread {
        Long retval = null;
        int rank = 0;
        int iteration = 0;

        AddOneItem(int rank, int iteration) {
            super("AddOneItem thread #" + rank);
            this.rank = rank;
            this.iteration = iteration;
            this.setDaemon(true);
        }

        public void run() {
            try {
                for (int i = 0; i < this.iteration; ++i) {
                    ChannelMono.this.channel.send(null, null, new Long(this.rank));
                    logger.debug((Object)("Thread #" + this.rank + " added element (" + this.rank + ')'));
                }
            }
            catch (ChannelException ex) {
                logger.error((Object)("Thread #" + this.rank + ": channel was closed"), (Throwable)ex);
            }
        }
    }

    class RemoveOneItem
    extends Thread {
        private boolean looping = true;
        int rank;
        Long retval = null;

        public RemoveOneItem(int rank) {
            super("RemoveOneItem thread #" + rank);
            this.rank = rank;
            this.setDaemon(true);
        }

        public void stopLooping() {
            this.looping = false;
        }

        public void run() {
            while (this.looping) {
                try {
                    Object obj = ChannelMono.this.channel.receive(0L);
                    if (obj instanceof View) {
                        logger.info((Object)("Thread #" + this.rank + ":--> NEW VIEW: " + obj));
                        continue;
                    }
                    if (!(obj instanceof Message)) continue;
                    Message msg = (Message)obj;
                    this.looping = false;
                    this.retval = (Long)msg.getObject();
                    logger.debug((Object)("Thread #" + this.rank + ": received " + this.retval));
                }
                catch (ChannelNotConnectedException conn) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)conn);
                    this.looping = false;
                }
                catch (TimeoutException e) {
                    logger.error((Object)("Thread #" + this.rank + ": channel time out but should'nt have..."), (Throwable)e);
                    this.looping = false;
                }
                catch (Exception e) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)e);
                }
            }
        }

        Long getRetval() {
            return this.retval;
        }
    }

    class ReadItems
    extends Thread {
        private boolean looping = true;
        int num_items = 0;
        int max = 0;
        int rank;

        public ReadItems(int rank, int num) {
            super("ReadItems thread #" + rank);
            this.rank = rank;
            this.max = num;
            this.setDaemon(true);
        }

        public void stopLooping() {
            this.looping = false;
        }

        public void run() {
            while (this.looping) {
                try {
                    Object obj = ChannelMono.this.channel.receive(0L);
                    if (obj instanceof View) {
                        logger.info((Object)("Thread #" + this.rank + ":--> NEW VIEW: " + obj));
                        continue;
                    }
                    if (!(obj instanceof Message)) continue;
                    Message msg = (Message)obj;
                    ++this.num_items;
                    if (this.num_items >= this.max) {
                        this.looping = false;
                    }
                    logger.debug((Object)("Thread #" + this.rank + " received :" + msg.getObject()));
                }
                catch (ChannelNotConnectedException conn) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)conn);
                    this.looping = false;
                }
                catch (TimeoutException e) {
                    logger.error((Object)("Thread #" + this.rank + ": channel timed out but should'nt have..."), (Throwable)e);
                    this.looping = false;
                }
                catch (ChannelClosedException e) {
                    logger.debug((Object)("Thread #" + this.rank + ": channel closed"), (Throwable)e);
                    this.looping = false;
                }
                catch (Exception e) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)e);
                    this.looping = false;
                }
            }
        }

        public int getNum_items() {
            return this.num_items;
        }
    }
}

