/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.ha;

import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.HARegion;
import com.gemstone.gemfire.internal.cache.ha.ConflatableObject;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueueAttributes;
import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import junit.framework.Assert;
import junit.framework.TestCase;
import util.TestException;

public class HARegionQueueDUnitTest
extends DistributedTestCase {
    VM vm0 = null;
    VM vm1 = null;
    VM vm3 = null;
    VM vm2 = null;
    protected static Cache cache = null;
    protected static HARegionQueue hrq = null;
    protected static volatile boolean toCnt = true;
    protected static Thread[] opThreads;
    protected static volatile Thread createQueuesThread;

    public HARegionQueueDUnitTest(String name) {
        super(name);
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        Host host = Host.getHost(0);
        this.vm0 = host.getVM(0);
        this.vm1 = host.getVM(1);
        this.vm2 = host.getVM(2);
        this.vm3 = host.getVM(3);
    }

    @Override
    public void tearDown2() throws Exception {
        super.tearDown2();
        this.vm0.invoke(HARegionQueueDUnitTest.class, "closeCache");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "closeCache");
        this.vm2.invoke(HARegionQueueDUnitTest.class, "closeCache");
        this.vm3.invoke(HARegionQueueDUnitTest.class, "closeCache");
    }

    protected Cache createCache() throws CacheException {
        Properties props = new Properties();
        InternalDistributedSystem ds = this.getSystem(props);
        ds.disconnect();
        ds = this.getSystem(props);
        Cache cache = null;
        cache = CacheFactory.create((DistributedSystem)ds);
        if (cache == null) {
            throw new CacheException("CacheFactory.create() returned null "){};
        }
        return cache;
    }

    public void testLocalPut() {
        this.vm0.invoke(HARegionQueueDUnitTest.class, "createRegion");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "createRegion");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "putValue1");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "getNull");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "putValue2");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "getValue1");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "getValue2");
    }

    public void testLocalDestroy() {
        this.vm0.invoke(HARegionQueueDUnitTest.class, "createRegion");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "createRegion");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "putValue1");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "getNull");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "putValue2");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "getValue1");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "getValue2");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "destroy");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "getNull");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "getValue2");
    }

    public void testGII() {
        this.vm0.invoke(HARegionQueueDUnitTest.class, "createRegion");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "putValue1");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "getValue1");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "createRegion");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "getValue1");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "putValue2");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "getValue2");
    }

    public void testGIIAndMapUpdates() {
        this.vm0.invoke(HARegionQueueDUnitTest.class, "createRegionQueue2");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "putConflatables");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "createRegionQueue2");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "clearRegion");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "verifyMapsAndData");
    }

    public void testQRM() {
        this.vm0.invoke(HARegionQueueDUnitTest.class, "createRegionQueue");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "createRegionQueue");
        this.vm0.invoke(HARegionQueueDUnitTest.class, "verifyAddingDispatchMesgs");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "verifyDispatchedMessagesRemoved");
    }

    public void _testBugNo35988() throws Exception {
        CacheSerializableRunnable createQueue = new CacheSerializableRunnable("CreateCache, HARegionQueue and start thread"){

            @Override
            public void run2() throws CacheException {
                HARegionQueueDUnitTest test = new HARegionQueueDUnitTest("region1");
                cache = test.createCache();
                cache.setMessageSyncInterval(1);
                HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
                hrqa.setExpiryTime(300);
                try {
                    hrq = HARegionQueue.getHARegionQueueInstance((String)"testregion1", (Cache)cache, (HARegionQueueAttributes)hrqa, (int)2, (boolean)false);
                    hrq.put((Object)new ConflatableObject((Object)new Long(1L), (Object)new Long(1L), new EventID(new byte[]{0}, 1L, 1L), false, "dummy"));
                }
                catch (Exception e) {
                    throw new CacheException(e){};
                }
            }
        };
        this.vm0.invoke(createQueue);
        this.vm1.invoke(createQueue);
        this.vm0.invoke(new CacheSerializableRunnable("takeFromVm0"){

            @Override
            public void run2() throws CacheException {
                try {
                    Conflatable obj = (Conflatable)hrq.take();
                    TestCase.assertNotNull((Object)obj);
                }
                catch (Exception e) {
                    throw new CacheException(e){};
                }
            }
        });
        this.vm1.invoke(new CacheSerializableRunnable("checkInVm1"){

            @Override
            public void run2() throws CacheException {
                DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                    @Override
                    public boolean done() {
                        Thread.yield();
                        return hrq.size() == 0;
                    }

                    @Override
                    public String description() {
                        return null;
                    }
                };
                DistributedTestCase.waitForCriterion(ev, 60000L, 200L, true);
            }
        });
    }

    public static void createRegion() throws Exception {
        HARegionQueueDUnitTest test = new HARegionQueueDUnitTest("HARegionQueueDUnitTest_region");
        cache = test.createCache();
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_ACK);
        factory.setDataPolicy(DataPolicy.REPLICATE);
        HARegion.getInstance((String)"HARegionQueueDUnitTest_region", (GemFireCacheImpl)((GemFireCacheImpl)cache), null, (RegionAttributes)factory.create());
    }

    public static void createRegionQueue() throws Exception {
        HARegionQueueDUnitTest test = new HARegionQueueDUnitTest("HARegionQueueDUnitTest_region");
        cache = test.createCache();
        hrq = HARegionQueue.getHARegionQueueInstance((String)"HARegionQueueDUnitTest_region", (Cache)cache, (int)2, (boolean)false);
        EventID id1 = new EventID(new byte[]{1}, 1L, 1L);
        EventID id2 = new EventID(new byte[]{1}, 1L, 2L);
        ConflatableObject c1 = new ConflatableObject((Object)"1", (Object)"1", id1, false, "HARegionQueueDUnitTest_region");
        ConflatableObject c2 = new ConflatableObject((Object)"2", (Object)"2", id2, false, "HARegionQueueDUnitTest_region");
        hrq.put((Object)c1);
        hrq.put((Object)c2);
    }

    public static void createRegionQueue2() throws Exception {
        HARegionQueueDUnitTest test = new HARegionQueueDUnitTest("HARegionQueueDUnitTest_region");
        cache = test.createCache();
        HARegionQueueAttributes harqAttr = new HARegionQueueAttributes();
        harqAttr.setExpiryTime(3);
        hrq = HARegionQueue.getHARegionQueueInstance((String)"HARegionQueueDUnitTest_region", (Cache)cache, (HARegionQueueAttributes)harqAttr, (int)2, (boolean)false);
    }

    public static void clearRegion() {
        try {
            Iterator iterator = hrq.getRegion().keys().iterator();
            while (iterator.hasNext()) {
                hrq.getRegion().localDestroy(iterator.next());
            }
        }
        catch (Exception e) {
            HARegionQueueDUnitTest.fail((String)"Exception occured while trying to destroy region");
        }
    }

    public static void verifyAddingDispatchMesgs() {
        Assert.assertTrue((boolean)HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty());
        hrq.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1L), 1L);
        Assert.assertTrue((!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty() ? 1 : 0) != 0);
    }

    public static void verifyDispatchedMessagesRemoved() {
        try {
            HARegion region = hrq.getRegion();
            DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion((Region)region){
                final /* synthetic */ Region val$region;
                {
                    this.val$region = region;
                }

                @Override
                public boolean done() {
                    Thread.yield();
                    return this.val$region.get((Object)new Long(0L)) == null;
                }

                @Override
                public String description() {
                    return null;
                }
            };
            DistributedTestCase.waitForCriterion(ev, 60000L, 200L, true);
            if (region.get((Object)new Long(1L)) == null) {
                HARegionQueueDUnitTest.fail((String)"Expected message not to have been deleted but it is deleted");
            }
        }
        catch (Exception e) {
            HARegionQueueDUnitTest.fail((String)("test failed due to an exception :  " + e));
        }
    }

    public static void closeCache() {
        if (cache != null && !cache.isClosed()) {
            cache.close();
            cache.getDistributedSystem().disconnect();
        }
    }

    public static void putValue1() {
        try {
            Region r1 = cache.getRegion("/HARegionQueueDUnitTest_region");
            r1.put((Object)"key-1", (Object)"value-1");
        }
        catch (Exception ex) {
            ex.printStackTrace();
            HARegionQueueDUnitTest.fail("failed while region.put()", ex);
        }
    }

    public static void putConflatables() {
        try {
            HARegion r1 = hrq.getRegion();
            for (int i = 1; i < 11; ++i) {
                r1.put((Object)new Long(i), (Object)new ConflatableObject((Object)("key" + i), (Object)("value" + i), new EventID(new byte[]{1}, 1L, (long)i), true, "HARegionQueueDUnitTest_region"));
            }
        }
        catch (Exception ex) {
            ex.printStackTrace();
            HARegionQueueDUnitTest.fail("failed while region.put()", ex);
        }
    }

    public static void verifyMapsAndData() {
        try {
            HARegion r1 = hrq.getRegion();
            Assert.assertNotNull((String)" Did not expect the HARegion to be null but it is", (Object)r1);
            for (int i = 1; i < 11; ++i) {
                Assert.assertNotNull((String)" Did not expect the entry to be null but it is", (Object)r1.get((Object)new Long(i)));
            }
            Assert.assertNotNull((String)" Did not expect the HARegionQueue to be null but it is", (Object)hrq);
            Map conflationMap = hrq.getConflationMapForTesting();
            Assert.assertTrue((String)" Did not expect the conflationMap size to be 0 but it is", (conflationMap.size() > 0 ? 1 : 0) != 0);
            Map internalMap = (Map)conflationMap.get("HARegionQueueDUnitTest_region");
            Assert.assertNotNull((String)" Did not expect the internalMap to be null but it is", (Object)internalMap);
            for (int i = 1; i < 11; ++i) {
                Assert.assertTrue((String)" Did not expect the entry not to be equal but it is", (boolean)internalMap.get("key" + i).equals(new Long(i)));
            }
            Map eventMap = hrq.getEventsMapForTesting();
            Assert.assertNotNull((String)" Did not expect the result (DACE object) to be null but it is", eventMap.get(new ThreadIdentifier(new byte[]{1}, 1L)));
            Set counterSet = hrq.getCurrentCounterSet(new EventID(new byte[]{1}, 1L, 1L));
            Assert.assertTrue((String)" excpected the counter set size to be 10 but it is not so", (counterSet.size() == 10 ? 1 : 0) != 0);
            long i = 1L;
            Iterator iterator = counterSet.iterator();
            while (iterator.hasNext()) {
                Assert.assertTrue(((Long)iterator.next() == i ? 1 : 0) != 0);
                ++i;
            }
            Assert.assertTrue((hrq.getLastDispatchedSequenceId(new EventID(new byte[]{1}, 1L, 1L)) == -1L ? 1 : 0) != 0);
            Thread.sleep(7500L);
            for (int j = 1; j < 11; ++j) {
                Assert.assertNull((String)"expected the entry to be null since expiry time exceeded but it is not so", (Object)r1.get((Object)new Long(j)));
            }
            internalMap = (Map)hrq.getConflationMapForTesting().get("HARegionQueueDUnitTest_region");
            Assert.assertNotNull((String)" Did not expect the internalMap to be null but it is", (Object)internalMap);
            Assert.assertTrue((String)"internalMap (conflation) should have been emptry since expiry of all entries has been exceeded but it is not so", (boolean)internalMap.isEmpty());
            Assert.assertTrue((String)"eventMap should have been emptry since expiry of all entries has been exceeded but it is not so", (boolean)eventMap.isEmpty());
            Assert.assertTrue((String)"counter set should have been emptry since expiry of all entries has been exceeded but it is not so", (boolean)counterSet.isEmpty());
        }
        catch (Exception ex) {
            ex.printStackTrace();
            HARegionQueueDUnitTest.fail("failed while region.put()", ex);
        }
    }

    public static void putValue2() {
        try {
            Region r1 = cache.getRegion("/HARegionQueueDUnitTest_region");
            r1.put((Object)"key-1", (Object)"value-2");
        }
        catch (Exception ex) {
            ex.printStackTrace();
            HARegionQueueDUnitTest.fail("failed while region.put()", ex);
        }
    }

    public static void getValue1() {
        try {
            Region r = cache.getRegion("/HARegionQueueDUnitTest_region");
            if (!r.get((Object)"key-1").equals("value-1")) {
                HARegionQueueDUnitTest.fail((String)"expected value to be value-1 but it is not so");
            }
        }
        catch (Exception ex) {
            ex.printStackTrace();
            HARegionQueueDUnitTest.fail("failed while region.get()", ex);
        }
    }

    public static void getNull() {
        try {
            Region r = cache.getRegion("/HARegionQueueDUnitTest_region");
            if (r.get((Object)"key-1") != null) {
                HARegionQueueDUnitTest.fail((String)"expected value to be null but it is not so");
            }
        }
        catch (Exception ex) {
            ex.printStackTrace();
            HARegionQueueDUnitTest.fail("failed while region.get()", ex);
        }
    }

    public static void getValue2() {
        try {
            Region r = cache.getRegion("/HARegionQueueDUnitTest_region");
            if (!r.get((Object)"key-1").equals("value-2")) {
                HARegionQueueDUnitTest.fail((String)"expected value to be value-2 but it is not so");
            }
        }
        catch (Exception ex) {
            ex.printStackTrace();
            HARegionQueueDUnitTest.fail("failed while region.get()", ex);
        }
    }

    public static void destroy() {
        try {
            Region region1 = cache.getRegion("/HARegionQueueDUnitTest_region");
            region1.localDestroy((Object)"key-1");
        }
        catch (Exception e) {
            e.printStackTrace();
            HARegionQueueDUnitTest.fail((String)"test failed due to exception in destroy ");
        }
    }

    public void testConcurrentOperationsDunitTestOnNonBlockingQueue() {
        this.concurrentOperationsDunitTest(false, Scope.DISTRIBUTED_ACK);
    }

    public void testConcurrentOperationsDunitTestOnNonBlockingQueueWithDNoAckRegion() {
        this.concurrentOperationsDunitTest(false, Scope.DISTRIBUTED_NO_ACK);
    }

    public void testConcurrentOperationsDunitTestOnBlockingQueue() {
        this.concurrentOperationsDunitTest(true, Scope.DISTRIBUTED_ACK);
    }

    private void concurrentOperationsDunitTest(final boolean createBlockingQueue, final Scope rscope) {
        CacheSerializableRunnable createRgnsAndQueues = new CacheSerializableRunnable("CreateCache, mirrored Region & HARegionQueue with a CacheListener"){

            @Override
            public void run2() throws CacheException {
                HARegionQueueDUnitTest test = new HARegionQueueDUnitTest("HARegionQueueDUnitTest_region");
                System.getProperties().put("QueueRemovalThreadWaitTime", "2000");
                cache = test.createCache();
                AttributesFactory factory = new AttributesFactory();
                factory.setScope(rscope);
                factory.setDataPolicy(DataPolicy.REPLICATE);
                HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
                hrqa.setExpiryTime(5);
                try {
                    hrq = createBlockingQueue ? HARegionQueue.getHARegionQueueInstance((String)"testregion1", (Cache)cache, (HARegionQueueAttributes)hrqa, (int)1, (boolean)false) : HARegionQueue.getHARegionQueueInstance((String)"testregion1", (Cache)cache, (HARegionQueueAttributes)hrqa, (int)2, (boolean)false);
                }
                catch (Exception e) {
                    throw new CacheException(e){};
                }
                factory.addCacheListener((CacheListener)new CacheListenerAdapter(){

                    public void afterCreate(EntryEvent event) {
                        ConflatableObject conflatable = new ConflatableObject(event.getKey(), event.getNewValue(), ((EntryEventImpl)event).getEventId(), false, event.getRegion().getFullPath());
                        try {
                            hrq.put((Object)conflatable);
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            TestCase.fail((String)("The put operation in queue did not succeed due to exception =" + e));
                        }
                    }

                    public void afterUpdate(EntryEvent event) {
                        ConflatableObject conflatable = new ConflatableObject(event.getKey(), event.getNewValue(), ((EntryEventImpl)event).getEventId(), true, event.getRegion().getFullPath());
                        try {
                            hrq.put((Object)conflatable);
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            TestCase.fail((String)("The put operation in queue did not succeed due to exception =" + e));
                        }
                    }
                });
                cache.createRegion("test_region", factory.create());
            }
        };
        this.vm0.invoke(createRgnsAndQueues);
        this.vm1.invoke(createRgnsAndQueues);
        this.vm2.invoke(createRgnsAndQueues);
        this.vm3.invoke(createRgnsAndQueues);
        CacheSerializableRunnable spawnThreadsAndperformOps = new CacheSerializableRunnable("Spawn multipe threads which do various operations"){

            @Override
            public void run2() throws CacheException {
                int i;
                opThreads = new Thread[10];
                for (i = 0; i < 4; ++i) {
                    HARegionQueueDUnitTest.opThreads[i] = new Thread((Runnable)new RunOp(1, i), "ID=" + i + ",Op=" + 1);
                }
                for (i = 4; i < 6; ++i) {
                    HARegionQueueDUnitTest.opThreads[i] = new Thread((Runnable)new RunOp(3, i), "ID=" + i + ",Op=" + 3);
                }
                for (i = 6; i < 8; ++i) {
                    HARegionQueueDUnitTest.opThreads[i] = new Thread((Runnable)new RunOp(2, i), "ID=" + i + ",Op=" + 2);
                }
                for (i = 8; i < 10; ++i) {
                    HARegionQueueDUnitTest.opThreads[i] = new Thread((Runnable)new RunOp(2, i), "ID=" + i + ",Op=" + 4);
                }
                for (i = 0; i < opThreads.length; ++i) {
                    opThreads[i].start();
                }
            }
        };
        this.vm0.invokeAsync(spawnThreadsAndperformOps);
        this.vm1.invokeAsync(spawnThreadsAndperformOps);
        this.vm2.invokeAsync(spawnThreadsAndperformOps);
        this.vm3.invokeAsync(spawnThreadsAndperformOps);
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException e1) {
            HARegionQueueDUnitTest.fail((String)"Test failed as the test thread encoutered exception in sleep");
        }
        CacheSerializableRunnable toggleFlag = new CacheSerializableRunnable("Toggle the flag to signal end of threads"){

            @Override
            public void run2() throws CacheException {
                toCnt = false;
                if (createBlockingQueue) {
                    try {
                        for (int i = 0; i < 100; ++i) {
                            hrq.put((Object)new ConflatableObject((Object)"1", (Object)"1", new EventID(new byte[]{1}, 100L, (long)i), false, "/x"));
                        }
                    }
                    catch (Exception e) {
                        throw new CacheException(e){};
                    }
                }
            }
        };
        this.vm0.invokeAsync(toggleFlag);
        this.vm1.invokeAsync(toggleFlag);
        this.vm2.invokeAsync(toggleFlag);
        this.vm3.invokeAsync(toggleFlag);
        CacheSerializableRunnable joinWithThreads = new CacheSerializableRunnable("Join with the threads"){

            @Override
            public void run2() throws CacheException {
                for (int i = 0; i < opThreads.length; ++i) {
                    if (opThreads[i].isInterrupted()) {
                        TestCase.fail((String)"Test failed because  thread encountered exception");
                    }
                    DistributedTestCase.join(opThreads[i], 30000L, DistributedTestCase.getLogWriter());
                }
            }
        };
        this.vm0.invoke(joinWithThreads);
        this.vm1.invoke(joinWithThreads);
        this.vm2.invoke(joinWithThreads);
        this.vm3.invoke(joinWithThreads);
        System.getProperties().remove("QueueRemovalThreadWaitTime");
    }

    public void testNPEDueToHARegionQueueEscapeInConstructor() {
        int EXPIRY_TIME = 30;
        CacheSerializableRunnable createQueuesAndThread = new CacheSerializableRunnable("CreateCache, HARegionQueue and start thread"){

            @Override
            public void run2() throws CacheException {
                HARegionQueueDUnitTest test = new HARegionQueueDUnitTest("region1");
                cache = test.createCache();
                cache.setMessageSyncInterval(1);
                HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
                hrqa.setExpiryTime(30);
                try {
                    hrq = HARegionQueue.getHARegionQueueInstance((String)"testNPEDueToHARegionQueueEscapeInConstructor", (Cache)cache, (HARegionQueueAttributes)hrqa, (int)2, (boolean)false);
                    int OP_COUNT = 200;
                    for (int i = 0; i < 200; ++i) {
                        hrq.put((Object)new ConflatableObject((Object)new Long(i), (Object)new Long(i), new EventID(new byte[]{0}, 1L, (long)i), false, "dummy"));
                    }
                    opThreads = new Thread[1];
                    HARegionQueueDUnitTest.opThreads[0] = new Thread(new Runnable(){

                        @Override
                        public void run() {
                            for (int i = 0; i < 200; ++i) {
                                try {
                                    Object o = hrq.take();
                                    if (o != null) continue;
                                    Thread.sleep(50L);
                                    continue;
                                }
                                catch (InterruptedException e) {
                                    TestCase.fail((String)"interrupted");
                                }
                            }
                        }
                    });
                    opThreads[0].start();
                }
                catch (Exception e) {
                    throw new CacheException(e){};
                }
            }
        };
        CacheSerializableRunnable createQueues = new CacheSerializableRunnable("CreateCache, HARegionQueue "){

            @Override
            public void run2() throws CacheException {
                createQueuesThread = Thread.currentThread();
                HARegionQueueDUnitTest test = new HARegionQueueDUnitTest("region1");
                cache = test.createCache();
                cache.setMessageSyncInterval(30);
                HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
                hrqa.setExpiryTime(30);
                try {
                    hrq = HARegionQueue.getHARegionQueueInstance((String)"testNPEDueToHARegionQueueEscapeInConstructor", (Cache)cache, (HARegionQueueAttributes)hrqa, (int)2, (boolean)false);
                }
                catch (Exception e) {
                    throw new CacheException(e){};
                }
            }
        };
        CacheSerializableRunnable waitForCreateQueuesThread = new CacheSerializableRunnable("joinCreateCache"){

            @Override
            public void run2() throws TestException {
                DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                    @Override
                    public boolean done() {
                        return createQueuesThread != null;
                    }

                    @Override
                    public String description() {
                        return null;
                    }
                };
                DistributedTestCase.waitForCriterion(ev, 30000L, 200L, true);
                DistributedTestCase.join(createQueuesThread, 300000L, DistributedTestCase.getLogWriter());
            }
        };
        this.vm0.invoke(createQueuesAndThread);
        this.vm1.invokeAsync(createQueues);
        CacheSerializableRunnable joinWithThread = new CacheSerializableRunnable("CreateCache, HARegionQueue join with thread"){

            @Override
            public void run2() throws CacheException {
                if (opThreads[0].isInterrupted()) {
                    TestCase.fail((String)"The test has failed as it encountered interrupts in puts & takes");
                }
                DistributedTestCase.join(opThreads[0], 30000L, DistributedTestCase.getLogWriter());
            }
        };
        this.vm0.invoke(joinWithThread);
        this.vm1.invoke(waitForCreateQueuesThread);
    }

    public void testBugNo35989() {
        this.vm0.invoke(HARegionQueueDUnitTest.class, "createRegionQueue");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "createHARegionQueueandCheckExpiration");
    }

    public static void createHARegionQueueandCheckExpiration() throws Exception {
        HARegionQueueDUnitTest test = new HARegionQueueDUnitTest("HARegionQueueDUnitTest_region");
        cache = test.createCache();
        HARegionQueueAttributes attrs = new HARegionQueueAttributes();
        attrs.setExpiryTime(1);
        hrq = HARegionQueue.getHARegionQueueInstance((String)"HARegionQueueDUnitTest_region", (Cache)cache, (HARegionQueueAttributes)attrs, (int)2, (boolean)false);
        DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

            @Override
            public boolean done() {
                return hrq.getAvalaibleIds().size() == 0;
            }

            @Override
            public String description() {
                return null;
            }
        };
        DistributedTestCase.waitForCriterion(ev, 60000L, 200L, true);
    }

    public void testForDuplicateEvents() {
        this.vm0.invoke(HARegionQueueDUnitTest.class, "createRegionQueue");
        this.vm1.invoke(HARegionQueueDUnitTest.class, "createRegionQueueandCheckDuplicates");
    }

    public static void createRegionQueueandCheckDuplicates() throws Exception {
        HARegionQueueDUnitTest test = new HARegionQueueDUnitTest("HARegionQueueDUnitTest_region");
        cache = test.createCache();
        hrq = HARegionQueue.getHARegionQueueInstance((String)"HARegionQueueDUnitTest_region", (Cache)cache, (int)2, (boolean)false);
        HARegionQueueDUnitTest.assertEquals((int)2, (int)hrq.size());
        EventID id1 = new EventID(new byte[]{1}, 1L, 1L);
        EventID id2 = new EventID(new byte[]{1}, 1L, 2L);
        ConflatableObject c1 = new ConflatableObject((Object)"1", (Object)"1", id1, false, "HARegionQueueDUnitTest_region");
        ConflatableObject c2 = new ConflatableObject((Object)"2", (Object)"2", id2, false, "HARegionQueueDUnitTest_region");
        hrq.put((Object)c1);
        hrq.put((Object)c2);
        HARegionQueueDUnitTest.assertEquals((int)2, (int)hrq.size());
    }

    class RunOp
    implements Runnable {
        int opType;
        int threadID;
        public static final int PUT = 1;
        public static final int TAKE = 2;
        public static final int PEEK = 3;
        public static final int BATCH_PEEK = 4;

        public RunOp(int opType, int id) {
            this.opType = opType;
            this.threadID = id;
        }

        @Override
        public void run() {
            Region rgn = cache.getRegion("test_region");
            int counter = 0;
            LogWriter logger = cache.getLogger();
            try {
                while (toCnt) {
                    Thread.sleep(20L);
                    switch (this.opType) {
                        case 1: {
                            rgn.put((Object)("key" + this.threadID), (Object)("val" + counter++));
                            if (counter != 10) break;
                            counter = 0;
                            break;
                        }
                        case 2: {
                            Conflatable cnf = (Conflatable)hrq.take();
                            if (!logger.fineEnabled() || cnf == null) break;
                            logger.fine("Object retrieved  by take has key =" + cnf.getKeyToConflate() + " and value as" + cnf.getValueToConflate());
                            break;
                        }
                        case 3: {
                            Conflatable cnf = (Conflatable)hrq.peek();
                            if (logger.fineEnabled() && cnf != null) {
                                logger.fine("Object retrieved  by peek has key =" + cnf.getKeyToConflate() + " and value as" + cnf.getValueToConflate());
                            }
                            hrq.remove();
                            break;
                        }
                        case 4: {
                            List confList = hrq.peek(3, 2000);
                            if (logger.fineEnabled() && confList != null) {
                                logger.fine("Object retrieved  by  batch peek are =" + confList);
                            }
                            hrq.remove();
                        }
                    }
                }
            }
            catch (Exception e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

