/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.cache30;

import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionEvent;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.tcp.Connection;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
import java.io.Serializable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Set;
import junit.framework.TestCase;

public class SlowRecDUnitTest
extends CacheTestCase {
    protected static Object lastCallback = null;
    protected static final String CHECK_INVALID = "CHECK_INVALID";
    private static final String KEY_SLEEP = "KEY_SLEEP";
    private static final String KEY_WAIT = "KEY_WAIT";
    private static final String KEY_DISCONNECT = "KEY_DISCONNECT";
    protected static final int CALLBACK_CREATE = 0;
    protected static final int CALLBACK_UPDATE = 1;
    protected static final int CALLBACK_INVALIDATE = 2;
    protected static final int CALLBACK_DESTROY = 3;
    protected static final int CALLBACK_REGION_INVALIDATE = 4;
    protected static final Integer CALLBACK_CREATE_INTEGER = new Integer(0);
    protected static final Integer CALLBACK_UPDATE_INTEGER = new Integer(1);
    protected static final Integer CALLBACK_INVALIDATE_INTEGER = new Integer(2);
    protected static final Integer CALLBACK_DESTROY_INTEGER = new Integer(3);
    protected static final Integer CALLBACK_REGION_INVALIDATE_INTEGER = new Integer(4);
    protected static ControlListener doTestMultipleRegionConflation_R1_Listener;
    protected static ControlListener doTestMultipleRegionConflation_R2_Listener;
    protected static ControlListener doTestDisconnectCleanup_Listener;
    protected static ControlListener doTestPartialMessage_Listener;

    public SlowRecDUnitTest(String name) {
        super(name);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setUp() throws Exception {
        try {
            SlowRecDUnitTest.disconnectAllFromDS();
        }
        finally {
            super.setUp();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void tearDown2() throws Exception {
        try {
            super.tearDown2();
        }
        finally {
            SlowRecDUnitTest.disconnectAllFromDS();
        }
    }

    private VM getOtherVm() {
        Host host = Host.getHost(0);
        return host.getVM(0);
    }

    private void doCreateOtherVm(final Properties p, final boolean addListener) {
        VM vm = this.getOtherVm();
        vm.invoke(new CacheSerializableRunnable("create root"){

            @Override
            public void run2() throws CacheException {
                CacheListenerAdapter cl;
                SlowRecDUnitTest.this.getSystem(p);
                SlowRecDUnitTest.this.createAckRegion(true, false);
                AttributesFactory af = new AttributesFactory();
                af.setScope(Scope.DISTRIBUTED_NO_ACK);
                af.setDataPolicy(DataPolicy.REPLICATE);
                if (addListener) {
                    cl = new CacheListenerAdapter(){

                        public void afterUpdate(EntryEvent event) {
                            try {
                                Thread.sleep(500L);
                            }
                            catch (InterruptedException shuttingDown) {
                                TestCase.fail((String)"interrupted");
                            }
                        }
                    };
                    af.setCacheListener((CacheListener)cl);
                } else {
                    cl = new CacheListenerAdapter(){

                        public void afterCreate(EntryEvent event) {
                            if (event.getCallbackArgument() != null) {
                                lastCallback = event.getCallbackArgument();
                            }
                            if (event.getKey().equals("sleepkey")) {
                                int sleepMs = (Integer)event.getNewValue();
                                try {
                                    Thread.sleep(sleepMs);
                                }
                                catch (InterruptedException ignore) {
                                    TestCase.fail((String)"interrupted");
                                }
                            }
                        }

                        public void afterUpdate(EntryEvent event) {
                            if (event.getCallbackArgument() != null) {
                                lastCallback = event.getCallbackArgument();
                            }
                            if (event.getKey().equals("sleepkey")) {
                                int sleepMs = (Integer)event.getNewValue();
                                try {
                                    Thread.sleep(sleepMs);
                                }
                                catch (InterruptedException ignore) {
                                    TestCase.fail((String)"interrupted");
                                }
                            }
                        }

                        public void afterInvalidate(EntryEvent event) {
                            if (event.getCallbackArgument() != null) {
                                lastCallback = event.getCallbackArgument();
                            }
                        }

                        public void afterDestroy(EntryEvent event) {
                            if (event.getCallbackArgument() != null) {
                                lastCallback = event.getCallbackArgument();
                            }
                        }
                    };
                    af.setCacheListener((CacheListener)cl);
                }
                Region r1 = SlowRecDUnitTest.this.createRootRegion("slowrec", af.create());
                r1.create((Object)"key", (Object)"value");
            }
        });
    }

    private void checkLastValueInOtherVm(final String lastValue, final Object lcb) {
        VM vm = this.getOtherVm();
        vm.invoke(new CacheSerializableRunnable("check last value"){

            @Override
            public void run2() throws CacheException {
                Region r;
                Region r1 = SlowRecDUnitTest.this.getRootRegion("slowrec");
                if (lcb != null) {
                    DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                        @Override
                        public boolean done() {
                            return lcb.equals(lastCallback);
                        }

                        @Override
                        public String description() {
                            return "waiting for callback";
                        }
                    };
                    DistributedTestCase.waitForCriterion(ev, 50000L, 200L, true);
                    TestCase.assertEquals((Object)lcb, (Object)lastCallback);
                }
                if (lastValue == null) {
                    r = r1;
                    DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                        @Override
                        public boolean done() {
                            return r.getEntry((Object)"key") == null;
                        }

                        @Override
                        public String description() {
                            return "waiting for key to become null";
                        }
                    };
                    DistributedTestCase.waitForCriterion(ev, 50000L, 200L, true);
                    TestCase.assertEquals(null, (Object)r1.getEntry((Object)"key"));
                } else if (SlowRecDUnitTest.CHECK_INVALID.equals(lastValue)) {
                    r = r1;
                    DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                        @Override
                        public boolean done() {
                            Region.Entry e = r.getEntry((Object)"key");
                            if (e == null) {
                                return false;
                            }
                            return e.getValue() == null;
                        }

                        @Override
                        public String description() {
                            return "waiting for invalidate";
                        }
                    };
                    DistributedTestCase.waitForCriterion(ev, 50000L, 200L, true);
                } else {
                    int retryCount = 1000;
                    Region.Entry re = null;
                    Object value = null;
                    while (!(retryCount-- <= 0 || (re = r1.getEntry((Object)"key")) != null && (value = re.getValue()) != null && value.equals(lastValue))) {
                        try {
                            Thread.sleep(50L);
                        }
                        catch (InterruptedException ignore) {
                            TestCase.fail((String)"interrupted");
                        }
                    }
                    TestCase.assertNotNull(re);
                    TestCase.assertNotNull(value);
                    TestCase.assertEquals((Object)lastValue, value);
                }
            }
        });
    }

    private void forceQueueFlush() {
        Connection.FORCE_ASYNC_QUEUE = false;
        final DMStats stats = this.getSystem().getDistributionManager().getStats();
        DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

            @Override
            public boolean done() {
                return stats.getAsyncThreads() == 0;
            }

            @Override
            public String description() {
                return "Waiting for async threads to disappear";
            }
        };
        DistributedTestCase.waitForCriterion(ev, 10000L, 200L, true);
    }

    private void forceQueuing(Region r) throws CacheException {
        Connection.FORCE_ASYNC_QUEUE = true;
        final DMStats stats = this.getSystem().getDistributionManager().getStats();
        r.put((Object)"forcekey", (Object)"forcevalue");
        DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

            @Override
            public boolean done() {
                return stats.getAsyncQueueFlushesInProgress() != 0;
            }

            @Override
            public String description() {
                return "waiting for flushes to start";
            }
        };
        DistributedTestCase.waitForCriterion(ev, 2000L, 200L, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testNoAck() throws CacheException {
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_NO_ACK);
        Region r = this.createRootRegion("slowrec", factory.create());
        final DMStats stats = this.getSystem().getDistributionManager().getStats();
        Properties p = new Properties();
        p.setProperty("async-distribution-timeout", "1");
        this.doCreateOtherVm(p, false);
        int repeatCount = 2;
        int count = 0;
        while (repeatCount-- > 0) {
            this.forceQueuing(r);
            String key = "key";
            long queuedMsgs = stats.getAsyncQueuedMsgs();
            long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
            long queueSize = stats.getAsyncQueueSize();
            String lastValue = "";
            long intialQueuedMsgs = queuedMsgs;
            long curQueuedMsgs = queuedMsgs - dequeuedMsgs;
            try {
                while (dequeuedMsgs < intialQueuedMsgs || curQueuedMsgs <= 6L) {
                    String value;
                    lastValue = value = "count=" + count;
                    r.put((Object)key, (Object)value);
                    ++count;
                    queueSize = stats.getAsyncQueueSize();
                    queuedMsgs = stats.getAsyncQueuedMsgs();
                    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
                    curQueuedMsgs = queuedMsgs - dequeuedMsgs;
                }
                SlowRecDUnitTest.getLogWriter().info("After " + count + " " + " puts slowrec mode kicked in by queuing " + queuedMsgs + " for a total size of " + queueSize);
            }
            finally {
                this.forceQueueFlush();
            }
            DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                @Override
                public boolean done() {
                    return stats.getAsyncQueueSize() == 0L;
                }

                @Override
                public String description() {
                    return "Waiting for queues to empty";
                }
            };
            long start = System.currentTimeMillis();
            DistributedTestCase.waitForCriterion(ev, 30000L, 200L, true);
            long finish = System.currentTimeMillis();
            SlowRecDUnitTest.getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + stats.getAsyncDequeuedMsgs() + " were flushed. lastValue=" + lastValue);
            this.checkLastValueInOtherVm(lastValue, null);
        }
    }

    protected Region createAckRegion(boolean mirror, boolean conflate) throws CacheException {
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_ACK);
        if (mirror) {
            factory.setDataPolicy(DataPolicy.REPLICATE);
        }
        if (conflate) {
            factory.setEnableAsyncConflation(true);
        }
        Region r = this.createRootRegion("AckRegion", factory.create());
        return r;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testNoAckConflation() throws CacheException {
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_NO_ACK);
        factory.setEnableAsyncConflation(true);
        Region r = this.createRootRegion("slowrec", factory.create());
        DMStats stats = this.getSystem().getDistributionManager().getStats();
        Properties p = new Properties();
        p.setProperty("async-distribution-timeout", "1");
        this.doCreateOtherVm(p, false);
        this.forceQueuing(r);
        String key = "key";
        int count = 0;
        long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
        String lastValue = "";
        long intialDeQueuedMsgs = stats.getAsyncDequeuedMsgs();
        long start = 0L;
        try {
            while (stats.getAsyncConflatedMsgs() - initialConflatedMsgs < 1000L) {
                String value;
                lastValue = value = "count=" + count;
                r.put((Object)key, (Object)value);
                ++count;
            }
            start = System.currentTimeMillis();
        }
        finally {
            this.forceQueueFlush();
        }
        long finish = System.currentTimeMillis();
        SlowRecDUnitTest.getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + (stats.getAsyncDequeuedMsgs() - intialDeQueuedMsgs) + " were flushed. Leaving a queue size of " + stats.getAsyncQueueSize() + ". The lastValue was " + lastValue);
        this.checkLastValueInOtherVm(lastValue, null);
    }

    public void testAckConflation() throws CacheException {
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_NO_ACK);
        factory.setEnableAsyncConflation(true);
        Region r = this.createRootRegion("slowrec", factory.create());
        final Region ar = this.createAckRegion(false, true);
        ar.create((Object)"ackKey", (Object)"ackValue");
        DMStats stats = this.getSystem().getDistributionManager().getStats();
        Properties p = new Properties();
        p.setProperty("async-distribution-timeout", "2");
        this.doCreateOtherVm(p, false);
        this.forceQueuing(r);
        long startQueuedMsgs = stats.getAsyncQueuedMsgs();
        long startConflatedMsgs = stats.getAsyncConflatedMsgs();
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                ar.put((Object)"ackKey", (Object)"ackValue");
            }
        });
        t.start();
        Thread t2 = new Thread(new Runnable(){

            @Override
            public void run() {
                ar.put((Object)"ackKey", (Object)"ackValue");
            }
        });
        t2.start();
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException ignore) {
            SlowRecDUnitTest.fail((String)"interrupted");
        }
        this.forceQueueFlush();
        DistributedTestCase.join(t, 2000L, SlowRecDUnitTest.getLogWriter());
        DistributedTestCase.join(t2, 2000L, SlowRecDUnitTest.getLogWriter());
        long endQueuedMsgs = stats.getAsyncQueuedMsgs();
        long endConflatedMsgs = stats.getAsyncConflatedMsgs();
        SlowRecDUnitTest.assertEquals((long)startConflatedMsgs, (long)endConflatedMsgs);
        SlowRecDUnitTest.assertEquals((long)endQueuedMsgs, (long)stats.getAsyncDequeuedMsgs());
        SlowRecDUnitTest.assertEquals((long)(startQueuedMsgs + 2L), (long)endQueuedMsgs);
    }

    public void _disabled_testConflationSequence() throws CacheException {
        int count;
        String value;
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_NO_ACK);
        factory.setEnableAsyncConflation(true);
        Region r = this.createRootRegion("slowrec", factory.create());
        factory.setEnableAsyncConflation(false);
        Region noConflate = this.createRootRegion("noConflate", factory.create());
        DMStats stats = this.getSystem().getDistributionManager().getStats();
        Properties p = new Properties();
        p.setProperty("async-distribution-timeout", "1");
        this.doCreateOtherVm(p, false);
        VM vm = this.getOtherVm();
        vm.invoke(new CacheSerializableRunnable("create noConflate"){

            @Override
            public void run2() throws CacheException {
                AttributesFactory af = new AttributesFactory();
                af.setScope(Scope.DISTRIBUTED_NO_ACK);
                af.setDataPolicy(DataPolicy.REPLICATE);
                SlowRecDUnitTest.this.createRootRegion("noConflate", af.create());
            }
        });
        String key = "key";
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence] about to force queuing");
        this.forceQueuing(r);
        String lastValue = value = "";
        String mylcb = null;
        long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
        int endCount = count + 60;
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence] about to build up queue");
        long begin = System.currentTimeMillis();
        for (count = 0; count < endCount; ++count) {
            lastValue = value = "count=" + count;
            r.create((Object)key, (Object)value);
            lastValue = value = "count=" + ++count;
            r.put((Object)key, (Object)value);
            ++count;
            mylcb = value;
            r.destroy((Object)key, (Object)mylcb);
            lastValue = null;
            SlowRecDUnitTest.assertTrue((System.currentTimeMillis() < begin + 120000L ? 1 : 0) != 0);
        }
        SlowRecDUnitTest.assertEquals((long)initialConflatedMsgs, (long)stats.getAsyncConflatedMsgs());
        this.forceQueueFlush();
        this.checkLastValueInOtherVm(lastValue, mylcb);
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence] force queuing create-update-destroy");
        this.forceQueuing(r);
        initialConflatedMsgs = stats.getAsyncConflatedMsgs();
        endCount = count + 40;
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence] create-update-destroy");
        begin = System.currentTimeMillis();
        while (count < endCount) {
            lastValue = value = "count=" + count;
            r.create((Object)key, (Object)value);
            lastValue = value = "count=" + ++count;
            r.put((Object)key, (Object)value);
            ++count;
            r.localDestroy((Object)key);
            SlowRecDUnitTest.assertTrue((System.currentTimeMillis() < begin + 120000L ? 1 : 0) != 0);
        }
        SlowRecDUnitTest.assertEquals((long)initialConflatedMsgs, (long)stats.getAsyncConflatedMsgs());
        this.forceQueueFlush();
        this.checkLastValueInOtherVm(lastValue, null);
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence] force queuing update-invalidate");
        this.forceQueuing(r);
        initialConflatedMsgs = stats.getAsyncConflatedMsgs();
        lastValue = value = "count=" + count;
        r.create((Object)key, (Object)value);
        endCount = ++count + 40;
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence] update-invalidate");
        begin = System.currentTimeMillis();
        while (count < endCount) {
            lastValue = value = "count=" + count;
            r.put((Object)key, (Object)value);
            ++count;
            r.invalidate((Object)key);
            ++count;
            lastValue = CHECK_INVALID;
            SlowRecDUnitTest.assertTrue((System.currentTimeMillis() < begin + 120000L ? 1 : 0) != 0);
        }
        SlowRecDUnitTest.assertEquals((long)initialConflatedMsgs, (long)stats.getAsyncConflatedMsgs());
        this.forceQueueFlush();
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence] assert other vm");
        this.checkLastValueInOtherVm(lastValue, null);
        r.destroy((Object)key);
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence] conflate & no-conflate regions");
        this.forceQueuing(r);
        int initialAsyncSocketWrites = stats.getAsyncSocketWrites();
        lastValue = value = "count=" + count;
        long conflatedMsgs = stats.getAsyncConflatedMsgs();
        long queuedMsgs = stats.getAsyncQueuedMsgs();
        r.create((Object)key, (Object)value);
        SlowRecDUnitTest.assertEquals((long)(++queuedMsgs), (long)stats.getAsyncQueuedMsgs());
        SlowRecDUnitTest.assertEquals((long)conflatedMsgs, (long)stats.getAsyncConflatedMsgs());
        r.put((Object)key, (Object)value);
        SlowRecDUnitTest.assertEquals((long)(++queuedMsgs), (long)stats.getAsyncQueuedMsgs());
        SlowRecDUnitTest.assertEquals((long)conflatedMsgs, (long)stats.getAsyncConflatedMsgs());
        noConflate.create((Object)key, (Object)value);
        SlowRecDUnitTest.assertEquals((long)(++queuedMsgs), (long)stats.getAsyncQueuedMsgs());
        SlowRecDUnitTest.assertEquals((long)conflatedMsgs, (long)stats.getAsyncConflatedMsgs());
        noConflate.put((Object)key, (Object)value);
        SlowRecDUnitTest.assertEquals((long)(++queuedMsgs), (long)stats.getAsyncQueuedMsgs());
        SlowRecDUnitTest.assertEquals((long)conflatedMsgs, (long)stats.getAsyncConflatedMsgs());
        endCount = ++count + 80;
        begin = System.currentTimeMillis();
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence:DEBUG] count=" + count + " queuedMsgs=" + stats.getAsyncQueuedMsgs() + " conflatedMsgs=" + stats.getAsyncConflatedMsgs() + " dequeuedMsgs=" + stats.getAsyncDequeuedMsgs() + " asyncSocketWrites=" + stats.getAsyncSocketWrites());
        while (count < endCount) {
            SlowRecDUnitTest.assertEquals((int)1, (int)stats.getAsyncThreads());
            SlowRecDUnitTest.assertEquals((int)1, (int)stats.getAsyncQueues());
            SlowRecDUnitTest.assertTrue((stats.getAsyncQueueFlushesInProgress() > 0 ? 1 : 0) != 0);
            SlowRecDUnitTest.assertEquals((int)initialAsyncSocketWrites, (int)stats.getAsyncSocketWrites());
            lastValue = value = "count=" + count;
            r.put((Object)key, (Object)value);
            ++count;
            SlowRecDUnitTest.assertEquals((long)queuedMsgs, (long)stats.getAsyncQueuedMsgs());
            SlowRecDUnitTest.assertEquals((long)(++conflatedMsgs), (long)stats.getAsyncConflatedMsgs());
            noConflate.put((Object)key, (Object)value);
            SlowRecDUnitTest.assertEquals((long)(++queuedMsgs), (long)stats.getAsyncQueuedMsgs());
            SlowRecDUnitTest.assertEquals((long)conflatedMsgs, (long)stats.getAsyncConflatedMsgs());
            SlowRecDUnitTest.assertTrue((System.currentTimeMillis() < begin + 120000L ? 1 : 0) != 0);
        }
        this.forceQueueFlush();
        SlowRecDUnitTest.getLogWriter().info("[testConflationSequence] assert other vm");
        this.checkLastValueInOtherVm(lastValue, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSizeDisconnect() throws CacheException {
        String expected = "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe";
        String addExpected = "<ExpectedException action=add>com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe</ExpectedException>";
        String removeExpected = "<ExpectedException action=remove>com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe</ExpectedException>";
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_NO_ACK);
        Region r = this.createRootRegion("slowrec", factory.create());
        final DM dm = this.getSystem().getDistributionManager();
        final DMStats stats = dm.getStats();
        final Set others = dm.getOtherDistributionManagerIds();
        Properties p = new Properties();
        p.setProperty("async-distribution-timeout", "5");
        p.setProperty("async-max-queue-size", "1");
        this.doCreateOtherVm(p, false);
        String key = "key";
        int VALUE_SIZE = 102400;
        byte[] value = new byte[102400];
        int count = 0;
        this.forceQueuing(r);
        long queuedMsgs = stats.getAsyncQueuedMsgs();
        long queueSize = stats.getAsyncQueueSize();
        this.getCache().getLogger().info("<ExpectedException action=add>com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe</ExpectedException>");
        try {
            while (stats.getAsyncQueueSizeExceeded() == 0 && stats.getAsyncQueueTimeouts() == 0) {
                r.put((Object)key, (Object)value);
                ++count;
                if (stats.getAsyncQueueSize() > 0L) {
                    queuedMsgs = stats.getAsyncQueuedMsgs();
                    queueSize = stats.getAsyncQueueSize();
                }
                if (count <= 100) continue;
                SlowRecDUnitTest.fail((String)"should have exceeded max-queue-size by now");
            }
            SlowRecDUnitTest.getLogWriter().info("After " + count + " " + 102400 + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
            DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                @Override
                public boolean done() {
                    return dm.getOtherDistributionManagerIds().size() <= others.size() && stats.getAsyncQueueSize() == 0L;
                }

                @Override
                public String description() {
                    return "waiting for connection loss";
                }
            };
            DistributedTestCase.waitForCriterion(ev, 30000L, 200L, true);
        }
        finally {
            this.forceQueueFlush();
            this.getCache().getLogger().info("<ExpectedException action=remove>com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe</ExpectedException>");
        }
        SlowRecDUnitTest.assertEquals((Object)others, (Object)dm.getOtherDistributionManagerIds());
        SlowRecDUnitTest.assertEquals((long)0L, (long)stats.getAsyncQueueSize());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void donottestTimeoutDisconnect() throws CacheException {
        String expected = "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe";
        String addExpected = "<ExpectedException action=add>com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe</ExpectedException>";
        String removeExpected = "<ExpectedException action=remove>com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe</ExpectedException>";
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_NO_ACK);
        Region r = this.createRootRegion("slowrec", factory.create());
        final DM dm = this.getSystem().getDistributionManager();
        final DMStats stats = dm.getStats();
        final Set others = dm.getOtherDistributionManagerIds();
        Properties p = new Properties();
        p.setProperty("async-distribution-timeout", "5");
        p.setProperty("async-queue-timeout", "500");
        this.doCreateOtherVm(p, true);
        String key = "key";
        int VALUE_SIZE = 1024;
        byte[] value = new byte[1024];
        int count = 0;
        long queuedMsgs = stats.getAsyncQueuedMsgs();
        long queueSize = stats.getAsyncQueueSize();
        long timeoutLimit = System.currentTimeMillis() + 5000L;
        this.getCache().getLogger().info("<ExpectedException action=add>com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe</ExpectedException>");
        try {
            while (stats.getAsyncQueueTimeouts() == 0) {
                r.put((Object)key, (Object)value);
                ++count;
                if (stats.getAsyncQueueSize() > 0L) {
                    queuedMsgs = stats.getAsyncQueuedMsgs();
                    queueSize = stats.getAsyncQueueSize();
                }
                if (System.currentTimeMillis() <= timeoutLimit) continue;
                SlowRecDUnitTest.fail((String)"should have exceeded async-queue-timeout by now");
            }
            SlowRecDUnitTest.getLogWriter().info("After " + count + " " + 1024 + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
            DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                @Override
                public boolean done() {
                    if (dm.getOtherDistributionManagerIds().size() > others.size()) {
                        return false;
                    }
                    return stats.getAsyncQueueSize() == 0L;
                }

                @Override
                public String description() {
                    return "waiting for departure";
                }
            };
            DistributedTestCase.waitForCriterion(ev, 2000L, 200L, true);
        }
        finally {
            this.getCache().getLogger().info("<ExpectedException action=remove>com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to||java.io.IOException: Broken pipe</ExpectedException>");
        }
        SlowRecDUnitTest.assertEquals((Object)others, (Object)dm.getOtherDistributionManagerIds());
        SlowRecDUnitTest.assertEquals((long)0L, (long)stats.getAsyncQueueSize());
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void donottestMultipleRegionConflation() throws Throwable {
        try {
            this.doTestMultipleRegionConflation();
            this.getOtherVm().invoke(new SerializableRunnable("Wake up other vm"){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Object object = SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK;
                    synchronized (object) {
                        SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
                    }
                }
            });
            return;
        }
        catch (VirtualMachineError e) {
            try {
                SystemFailure.initiateFailure((Error)e);
                throw e;
                catch (Throwable t) {
                    SlowRecDUnitTest.getLogWriter().error("Encountered exception: ", t);
                    throw t;
                }
            }
            catch (Throwable throwable) {
                this.getOtherVm().invoke(new /* invalid duplicate definition of identical inner class */);
                throw throwable;
            }
        }
    }

    private void doTestMultipleRegionConflation() throws Exception {
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_NO_ACK);
        factory.setEnableAsyncConflation(true);
        Region r1 = this.createRootRegion("slowrec1", factory.create());
        Region r2 = this.createRootRegion("slowrec2", factory.create());
        SlowRecDUnitTest.assertTrue((boolean)this.getSystem().isConnected());
        SlowRecDUnitTest.assertNotNull((Object)r1);
        SlowRecDUnitTest.assertFalse((boolean)r1.isDestroyed());
        SlowRecDUnitTest.assertNotNull((Object)this.getCache());
        SlowRecDUnitTest.assertNotNull((Object)this.getCache().getRegion("slowrec1"));
        SlowRecDUnitTest.assertNotNull((Object)r2);
        SlowRecDUnitTest.assertFalse((boolean)r2.isDestroyed());
        SlowRecDUnitTest.assertNotNull((Object)this.getCache());
        SlowRecDUnitTest.assertNotNull((Object)this.getCache().getRegion("slowrec2"));
        DM dm = this.getSystem().getDistributionManager();
        InternalDistributedMember controllerVM = dm.getDistributionManagerId();
        DMStats stats = dm.getStats();
        int millisToWait = 300000;
        long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
        final Properties p = new Properties();
        p.setProperty("async-distribution-timeout", "5");
        p.setProperty("async-queue-timeout", "86400000");
        p.setProperty("async-max-queue-size", "1024");
        this.getOtherVm().invoke(new CacheSerializableRunnable("Create other vm", (Serializable)controllerVM){
            final /* synthetic */ Serializable val$controllerVM;
            {
                this.val$controllerVM = serializable;
                super(x0);
            }

            @Override
            public void run2() throws CacheException {
                SlowRecDUnitTest.this.getSystem(p);
                DM dm = SlowRecDUnitTest.this.getSystem().getDistributionManager();
                TestCase.assertTrue((boolean)dm.getDistributionManagerIds().contains(this.val$controllerVM));
                AttributesFactory af = new AttributesFactory();
                af.setScope(Scope.DISTRIBUTED_NO_ACK);
                af.setDataPolicy(DataPolicy.REPLICATE);
                doTestMultipleRegionConflation_R1_Listener = new ControlListener();
                af.setCacheListener((CacheListener)doTestMultipleRegionConflation_R1_Listener);
                SlowRecDUnitTest.this.createRootRegion("slowrec1", af.create());
                doTestMultipleRegionConflation_R2_Listener = new ControlListener();
                af.setCacheListener((CacheListener)doTestMultipleRegionConflation_R2_Listener);
                SlowRecDUnitTest.this.createRootRegion("slowrec2", af.create());
            }
        });
        SlowRecDUnitTest.getLogWriter().info("[doTestMultipleRegionConflation] about to put vm0 into wait");
        r1.put((Object)KEY_WAIT, (Object)new Integer(300000));
        SlowRecDUnitTest.getLogWriter().info("[doTestMultipleRegionConflation] building up queue size...");
        String key = "key";
        int socketBufferSize = this.getSystem().getConfig().getSocketBufferSize();
        int VALUE_SIZE = socketBufferSize * 3;
        byte[] value = new byte[VALUE_SIZE];
        int count = 0;
        while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
            ++count;
            r1.put((Object)key, (Object)value);
        }
        SlowRecDUnitTest.getLogWriter().info("[doTestMultipleRegionConflation] After " + count + " puts of size " + VALUE_SIZE + " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
        String key1 = "key1";
        String key2 = "key2";
        String putKey = key1;
        boolean flag = true;
        for (int i = 0; i < 30; ++i) {
            if (i == 10) {
                putKey = key2;
            }
            if (flag) {
                if (i == 6) {
                    r1.invalidate((Object)putKey, (Object)new Integer(i));
                } else if (i == 24) {
                    r1.invalidateRegion((Object)new Integer(i));
                } else {
                    r1.put((Object)putKey, (Object)value, (Object)new Integer(i));
                }
            } else if (i == 15) {
                r2.destroy((Object)putKey, (Object)new Integer(i));
            } else {
                r2.put((Object)putKey, (Object)value, (Object)new Integer(i));
            }
            flag = !flag;
        }
        final int[] r1ExpectedArgs = new int[]{0, 4, 6, 8, 10, 24, 28};
        final int[] r1ExpectedTypes = new int[]{0, 1, 2, 1, 0, 4, 1};
        final int[] r2ExpectedArgs = new int[]{1, 9, 11, 13, 15, 17, 29};
        final int[] r2ExpectedTypes = new int[]{0, 1, 0, 1, 3, 0, 1};
        SlowRecDUnitTest.getLogWriter().info("[doTestMultipleRegionConflation] wake up vm0");
        this.getOtherVm().invoke(new SerializableRunnable("Wake up other vm"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK;
                synchronized (object) {
                    SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
                }
            }
        });
        SlowRecDUnitTest.getLogWriter().info("[doTestMultipleRegionConflation] wait for vm0");
        this.getOtherVm().invoke(new SerializableRunnable("Wait for other vm"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    Object object = SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK;
                    synchronized (object) {
                        while (SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.callbackArguments.size() < r1ExpectedArgs.length) {
                            SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.wait(300000L);
                        }
                    }
                    object = SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK;
                    synchronized (object) {
                        while (SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.callbackArguments.size() < r2ExpectedArgs.length) {
                            SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK.wait(300000L);
                        }
                    }
                }
                catch (InterruptedException ignore) {
                    TestCase.fail((String)"interrupted");
                }
            }
        });
        SlowRecDUnitTest.getLogWriter().info("[doTestMultipleRegionConflation] assert callback arguments");
        this.getOtherVm().invoke(new SerializableRunnable("Assert callback arguments"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                int i;
                Object object = SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK;
                synchronized (object) {
                    DistributedTestCase.getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackArguments=" + SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.callbackArguments);
                    DistributedTestCase.getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackTypes=" + SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.callbackTypes);
                    TestCase.assertEquals((int)SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.callbackArguments.size(), (int)SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.callbackTypes.size());
                    i = 0;
                    for (CallbackWrapper wrapper : SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.callbackArguments) {
                        TestCase.assertEquals((Object)new Integer(r1ExpectedArgs[i]), (Object)wrapper.callbackArgument);
                        TestCase.assertEquals((Object)new Integer(r1ExpectedTypes[i]), SlowRecDUnitTest.doTestMultipleRegionConflation_R1_Listener.callbackTypes.get(i));
                        ++i;
                    }
                }
                object = SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK;
                synchronized (object) {
                    DistributedTestCase.getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackArguments=" + SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.callbackArguments);
                    DistributedTestCase.getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackTypes=" + SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.callbackTypes);
                    TestCase.assertEquals((int)SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.callbackArguments.size(), (int)SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.callbackTypes.size());
                    i = 0;
                    for (CallbackWrapper wrapper : SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.callbackArguments) {
                        TestCase.assertEquals((Object)new Integer(r2ExpectedArgs[i]), (Object)wrapper.callbackArgument);
                        TestCase.assertEquals((Object)new Integer(r2ExpectedTypes[i]), SlowRecDUnitTest.doTestMultipleRegionConflation_R2_Listener.callbackTypes.get(i));
                        ++i;
                    }
                }
            }
        });
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void testDisconnectCleanup() throws Throwable {
        try {
            this.doTestDisconnectCleanup();
            this.getOtherVm().invoke(new SerializableRunnable("Wake up other vm"){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Object object = SlowRecDUnitTest.doTestDisconnectCleanup_Listener.CONTROL_LOCK;
                    synchronized (object) {
                        SlowRecDUnitTest.doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
                    }
                }
            });
            return;
        }
        catch (VirtualMachineError e) {
            try {
                SystemFailure.initiateFailure((Error)e);
                throw e;
                catch (Throwable t) {
                    SlowRecDUnitTest.getLogWriter().error("Encountered exception: ", t);
                    throw t;
                }
            }
            catch (Throwable throwable) {
                this.getOtherVm().invoke(new /* invalid duplicate definition of identical inner class */);
                throw throwable;
            }
        }
    }

    private void doTestDisconnectCleanup() throws Exception {
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_NO_ACK);
        Region r = this.createRootRegion("slowrec", factory.create());
        final DM dm = this.getSystem().getDistributionManager();
        final DMStats stats = dm.getStats();
        final Set others = dm.getOtherDistributionManagerIds();
        long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
        final int initialQueues = stats.getAsyncQueues();
        final Properties p = new Properties();
        p.setProperty("async-distribution-timeout", "5");
        p.setProperty("async-queue-timeout", "86400000");
        p.setProperty("async-max-queue-size", "1024");
        this.getOtherVm().invoke(new CacheSerializableRunnable("Create other vm"){

            @Override
            public void run2() throws CacheException {
                SlowRecDUnitTest.this.getSystem(p);
                AttributesFactory af = new AttributesFactory();
                af.setScope(Scope.DISTRIBUTED_NO_ACK);
                af.setDataPolicy(DataPolicy.REPLICATE);
                doTestDisconnectCleanup_Listener = new ControlListener();
                af.setCacheListener((CacheListener)doTestDisconnectCleanup_Listener);
                SlowRecDUnitTest.this.createRootRegion("slowrec", af.create());
            }
        });
        SlowRecDUnitTest.getLogWriter().info("[testDisconnectCleanup] about to put vm0 into wait");
        int millisToWait = 300000;
        r.put((Object)KEY_WAIT, (Object)new Integer(millisToWait));
        r.put((Object)KEY_DISCONNECT, (Object)KEY_DISCONNECT);
        SlowRecDUnitTest.getLogWriter().info("[testDisconnectCleanup] building up queue size...");
        String key = "key";
        int socketBufferSize = this.getSystem().getConfig().getSocketBufferSize();
        int VALUE_SIZE = socketBufferSize * 3;
        byte[] value = new byte[VALUE_SIZE];
        int count = 0;
        long abortMillis = System.currentTimeMillis() + (long)millisToWait;
        while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
            ++count;
            r.put((Object)key, (Object)value);
            SlowRecDUnitTest.assertFalse((System.currentTimeMillis() >= abortMillis ? 1 : 0) != 0);
        }
        SlowRecDUnitTest.getLogWriter().info("[testDisconnectCleanup] After " + count + " puts of size " + VALUE_SIZE + " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
        while (stats.getAsyncQueuedMsgs() < 10L || stats.getAsyncQueueSize() < (long)(VALUE_SIZE * 10)) {
            ++count;
            r.put((Object)key, (Object)value);
            SlowRecDUnitTest.assertFalse((System.currentTimeMillis() >= abortMillis ? 1 : 0) != 0);
        }
        SlowRecDUnitTest.assertTrue((stats.getAsyncQueuedMsgs() >= 10L ? 1 : 0) != 0);
        while (stats.getAsyncQueues() < 1) {
            SlowRecDUnitTest.pause(100);
            SlowRecDUnitTest.assertFalse((System.currentTimeMillis() >= abortMillis ? 1 : 0) != 0);
        }
        SlowRecDUnitTest.getLogWriter().info("[testDisconnectCleanup] After " + count + " puts of size " + VALUE_SIZE + " queue size has reached " + stats.getAsyncQueueSize() + " bytes and number of queues is " + stats.getAsyncQueues() + ".");
        SlowRecDUnitTest.assertTrue((stats.getAsyncQueueSize() >= (long)(VALUE_SIZE * 5) ? 1 : 0) != 0);
        SlowRecDUnitTest.assertEquals((int)(initialQueues + 1), (int)stats.getAsyncQueues());
        SlowRecDUnitTest.assertTrue((dm.getOtherDistributionManagerIds().size() > others.size() ? 1 : 0) != 0);
        SlowRecDUnitTest.getLogWriter().info("[testDisconnectCleanup] wake up vm0");
        this.getOtherVm().invoke(new SerializableRunnable("Wake up other vm"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = SlowRecDUnitTest.doTestDisconnectCleanup_Listener.CONTROL_LOCK;
                synchronized (object) {
                    SlowRecDUnitTest.doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
                }
            }
        });
        SlowRecDUnitTest.getLogWriter().info("[testDisconnectCleanup] wait for vm0 to disconnect");
        DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

            @Override
            public boolean done() {
                return dm.getOtherDistributionManagerIds().size() <= others.size();
            }

            @Override
            public String description() {
                return "waiting for disconnect";
            }
        };
        DistributedTestCase.waitForCriterion(ev, 2000L, 200L, true);
        SlowRecDUnitTest.assertEquals((Object)others, (Object)dm.getOtherDistributionManagerIds());
        SlowRecDUnitTest.getLogWriter().info("[testDisconnectCleanup] wait for queue cleanup");
        ev = new DistributedTestCase.WaitCriterion(){

            @Override
            public boolean done() {
                if (stats.getAsyncQueues() <= initialQueues) {
                    return true;
                }
                Runtime.getRuntime().gc();
                return false;
            }

            @Override
            public String description() {
                return "waiting for queue cleanup";
            }
        };
        DistributedTestCase.waitForCriterion(ev, 2000L, 200L, true);
        SlowRecDUnitTest.assertEquals((int)initialQueues, (int)stats.getAsyncQueues());
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void donottestPartialMessage() throws Throwable {
        try {
            this.doTestPartialMessage();
            this.getOtherVm().invoke(new SerializableRunnable("Wake up other vm"){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Object object = SlowRecDUnitTest.doTestPartialMessage_Listener.CONTROL_LOCK;
                    synchronized (object) {
                        SlowRecDUnitTest.doTestPartialMessage_Listener.CONTROL_LOCK.notifyAll();
                    }
                }
            });
            return;
        }
        catch (VirtualMachineError e) {
            try {
                SystemFailure.initiateFailure((Error)e);
                throw e;
                catch (Throwable t) {
                    SlowRecDUnitTest.getLogWriter().error("Encountered exception: ", t);
                    throw t;
                }
            }
            catch (Throwable throwable) {
                this.getOtherVm().invoke(new /* invalid duplicate definition of identical inner class */);
                throw throwable;
            }
        }
    }

    private void doTestPartialMessage() throws Exception {
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_NO_ACK);
        factory.setEnableAsyncConflation(true);
        Region r = this.createRootRegion("slowrec", factory.create());
        DM dm = this.getSystem().getDistributionManager();
        DMStats stats = dm.getStats();
        long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
        final Properties p = new Properties();
        p.setProperty("async-distribution-timeout", String.valueOf(4000));
        p.setProperty("async-queue-timeout", "86400000");
        p.setProperty("async-max-queue-size", "1024");
        this.getOtherVm().invoke(new CacheSerializableRunnable("Create other vm"){

            @Override
            public void run2() throws CacheException {
                SlowRecDUnitTest.this.getSystem(p);
                AttributesFactory af = new AttributesFactory();
                af.setScope(Scope.DISTRIBUTED_NO_ACK);
                af.setDataPolicy(DataPolicy.REPLICATE);
                doTestPartialMessage_Listener = new ControlListener();
                af.setCacheListener((CacheListener)doTestPartialMessage_Listener);
                SlowRecDUnitTest.this.createRootRegion("slowrec", af.create());
            }
        });
        SlowRecDUnitTest.getLogWriter().info("[testPartialMessage] about to put vm0 into wait");
        int millisToWait = 300000;
        r.put((Object)KEY_WAIT, (Object)new Integer(300000));
        SlowRecDUnitTest.getLogWriter().info("[testPartialMessage] building up queue size...");
        String key = "key";
        int socketBufferSize = this.getSystem().getConfig().getSocketBufferSize();
        int VALUE_SIZE = socketBufferSize * 3;
        byte[] value = new byte[VALUE_SIZE];
        int count = 0;
        while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
            r.put((Object)key, (Object)value, (Object)new Integer(++count));
        }
        final int partialId = count;
        SlowRecDUnitTest.assertEquals((long)0L, (long)stats.getAsyncConflatedMsgs());
        SlowRecDUnitTest.getLogWriter().info("[testPartialMessage] After " + count + " puts of size " + VALUE_SIZE + " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
        SlowRecDUnitTest.pause(2000);
        while (stats.getAsyncConflatedMsgs() < 10L) {
            r.put((Object)key, (Object)value, (Object)new Integer(++count));
            if (count == partialId + 1) {
                SlowRecDUnitTest.assertEquals((long)(initialQueuedMsgs + 2L), (long)stats.getAsyncQueuedMsgs());
                SlowRecDUnitTest.assertEquals((long)0L, (long)stats.getAsyncConflatedMsgs());
                continue;
            }
            if (count != partialId + 2) continue;
            SlowRecDUnitTest.assertEquals((long)(initialQueuedMsgs + 2L), (long)stats.getAsyncQueuedMsgs());
            SlowRecDUnitTest.assertEquals((long)1L, (long)stats.getAsyncConflatedMsgs());
        }
        final int conflateId = count;
        final int[] expectedArgs = new int[]{partialId, conflateId};
        SlowRecDUnitTest.getLogWriter().info("[testPartialMessage] wake up vm0");
        this.getOtherVm().invoke(new SerializableRunnable("Wake up other vm"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = SlowRecDUnitTest.doTestPartialMessage_Listener.CONTROL_LOCK;
                synchronized (object) {
                    SlowRecDUnitTest.doTestPartialMessage_Listener.CONTROL_LOCK.notify();
                }
            }
        });
        SlowRecDUnitTest.getLogWriter().info("[testPartialMessage] wait for vm0");
        this.getOtherVm().invoke(new SerializableRunnable("Wait for other vm"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    Object object = SlowRecDUnitTest.doTestPartialMessage_Listener.CONTROL_LOCK;
                    synchronized (object) {
                        boolean done = false;
                        while (!done) {
                            if (SlowRecDUnitTest.doTestPartialMessage_Listener.callbackArguments.size() > 0) {
                                CallbackWrapper last = (CallbackWrapper)SlowRecDUnitTest.doTestPartialMessage_Listener.callbackArguments.getLast();
                                Integer lastId = (Integer)last.callbackArgument;
                                if (lastId == conflateId) {
                                    done = true;
                                    continue;
                                }
                                SlowRecDUnitTest.doTestPartialMessage_Listener.CONTROL_LOCK.wait(300000L);
                                continue;
                            }
                            SlowRecDUnitTest.doTestPartialMessage_Listener.CONTROL_LOCK.wait(300000L);
                        }
                    }
                }
                catch (InterruptedException ignore) {
                    TestCase.fail((String)"interrupted");
                }
            }
        });
        SlowRecDUnitTest.getLogWriter().info("[testPartialMessage] assert callback arguments");
        this.getOtherVm().invoke(new SerializableRunnable("Assert callback arguments"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = SlowRecDUnitTest.doTestPartialMessage_Listener.CONTROL_LOCK;
                synchronized (object) {
                    DistributedTestCase.getLogWriter().info("[testPartialMessage] doTestPartialMessage_Listener.callbackArguments=" + SlowRecDUnitTest.doTestPartialMessage_Listener.callbackArguments);
                    TestCase.assertEquals((int)SlowRecDUnitTest.doTestPartialMessage_Listener.callbackArguments.size(), (int)SlowRecDUnitTest.doTestPartialMessage_Listener.callbackTypes.size());
                    int i = 0;
                    Iterator argIter = SlowRecDUnitTest.doTestPartialMessage_Listener.callbackArguments.iterator();
                    Iterator typeIter = SlowRecDUnitTest.doTestPartialMessage_Listener.callbackTypes.iterator();
                    while (argIter.hasNext()) {
                        CallbackWrapper wrapper = (CallbackWrapper)argIter.next();
                        Integer arg = (Integer)wrapper.callbackArgument;
                        typeIter.next();
                        if (arg < partialId) continue;
                        TestCase.assertEquals((Object)new Integer(expectedArgs[i]), (Object)arg);
                        ++i;
                    }
                }
            }
        });
    }

    protected static class ControlListener
    extends CacheListenerAdapter {
        public final LinkedList callbackArguments = new LinkedList();
        public final LinkedList callbackTypes = new LinkedList();
        public final Object CONTROL_LOCK = new Object();

        protected ControlListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void afterCreate(EntryEvent event) {
            DistributedTestCase.getLogWriter().info(event.getRegion().getName() + " afterCreate " + event.getKey());
            Object object = this.CONTROL_LOCK;
            synchronized (object) {
                if (event.getCallbackArgument() != null) {
                    this.callbackArguments.add(new CallbackWrapper(event.getCallbackArgument(), 0));
                    this.callbackTypes.add(CALLBACK_CREATE_INTEGER);
                    this.CONTROL_LOCK.notifyAll();
                }
            }
            this.processEvent(event);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void afterUpdate(EntryEvent event) {
            DistributedTestCase.getLogWriter().info(event.getRegion().getName() + " afterUpdate " + event.getKey());
            Object object = this.CONTROL_LOCK;
            synchronized (object) {
                if (event.getCallbackArgument() != null) {
                    this.callbackArguments.add(new CallbackWrapper(event.getCallbackArgument(), 1));
                    this.callbackTypes.add(CALLBACK_UPDATE_INTEGER);
                    this.CONTROL_LOCK.notifyAll();
                }
            }
            this.processEvent(event);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void afterInvalidate(EntryEvent event) {
            Object object = this.CONTROL_LOCK;
            synchronized (object) {
                if (event.getCallbackArgument() != null) {
                    this.callbackArguments.add(new CallbackWrapper(event.getCallbackArgument(), 2));
                    this.callbackTypes.add(CALLBACK_INVALIDATE_INTEGER);
                    this.CONTROL_LOCK.notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void afterDestroy(EntryEvent event) {
            Object object = this.CONTROL_LOCK;
            synchronized (object) {
                if (event.getCallbackArgument() != null) {
                    this.callbackArguments.add(new CallbackWrapper(event.getCallbackArgument(), 3));
                    this.callbackTypes.add(CALLBACK_DESTROY_INTEGER);
                    this.CONTROL_LOCK.notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void afterRegionInvalidate(RegionEvent event) {
            Object object = this.CONTROL_LOCK;
            synchronized (object) {
                if (event.getCallbackArgument() != null) {
                    this.callbackArguments.add(new CallbackWrapper(event.getCallbackArgument(), 4));
                    this.callbackTypes.add(CALLBACK_REGION_INVALIDATE_INTEGER);
                    this.CONTROL_LOCK.notifyAll();
                }
            }
        }

        private void processEvent(EntryEvent event) {
            if (event.getKey().equals(SlowRecDUnitTest.KEY_SLEEP)) {
                this.processSleep(event);
            } else if (event.getKey().equals(SlowRecDUnitTest.KEY_WAIT)) {
                this.processWait(event);
            } else if (event.getKey().equals(SlowRecDUnitTest.KEY_DISCONNECT)) {
                this.processDisconnect(event);
            }
        }

        private void processSleep(EntryEvent event) {
            int sleepMs = (Integer)event.getNewValue();
            DistributedTestCase.getLogWriter().info("[processSleep] sleeping for " + sleepMs);
            try {
                Thread.sleep(sleepMs);
            }
            catch (InterruptedException ignore) {
                TestCase.fail((String)"interrupted");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processWait(EntryEvent event) {
            int sleepMs = (Integer)event.getNewValue();
            DistributedTestCase.getLogWriter().info("[processWait] waiting for " + sleepMs);
            Object object = this.CONTROL_LOCK;
            synchronized (object) {
                try {
                    this.CONTROL_LOCK.wait(sleepMs);
                }
                catch (InterruptedException ignore) {
                    return;
                }
            }
        }

        private void processDisconnect(EntryEvent event) {
            DistributedTestCase.getLogWriter().info("[processDisconnect] disconnecting");
            CacheTestCase.disconnectFromDS();
        }
    }

    private static class CallbackWrapper {
        public final Object callbackArgument;
        public final int callbackType;

        public CallbackWrapper(Object callbackArgument, int callbackType) {
            this.callbackArgument = callbackArgument;
            this.callbackType = callbackType;
        }

        public String toString() {
            return "CallbackWrapper: " + this.callbackArgument.toString() + " of type " + this.callbackType;
        }
    }
}

