/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.ha;

import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionEvent;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.query.CqAttributes;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryTestListener;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache30.BridgeTestCase;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.BridgeServerImpl;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.HARegion;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
import com.gemstone.gemfire.internal.cache.tier.sockets.ConflationDUnitTest;
import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
import hydra.Log;
import java.util.Iterator;
import java.util.Properties;
import junit.framework.TestCase;
import util.AbstractListener;

public class HADispatcherDUnitTest
extends DistributedTestCase {
    VM server1 = null;
    VM server2 = null;
    VM client1 = null;
    VM client2 = null;
    public static int PORT1;
    public static int PORT2;
    private static final String REGION_NAME = "HADispatcherDUnitTest_region";
    protected static Cache cache;
    public static final Object dummyObj;
    static volatile boolean isObjectPresent;
    static final String KEY1 = "KEY1";
    static final String VALUE1 = "VALUE1";
    static final String KEY2 = "KEY2";
    static final String VALUE2 = "VALUE2";
    static volatile boolean waitFlag;

    public HADispatcherDUnitTest(String name) {
        super(name);
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        Host host = Host.getHost(0);
        this.server1 = host.getVM(0);
        this.server2 = host.getVM(1);
        this.client1 = host.getVM(2);
        this.client2 = host.getVM(3);
        PORT1 = (Integer)this.server1.invoke(HADispatcherDUnitTest.class, "createServerCache", new Object[]{new Boolean(false)});
        this.server1.invoke(ConflationDUnitTest.class, "setIsSlowStart");
        this.server1.invoke(HADispatcherDUnitTest.class, "makeDispatcherSlow");
        this.server1.invoke(HADispatcherDUnitTest.class, "setQRMslow");
        PORT2 = (Integer)this.server2.invoke(HADispatcherDUnitTest.class, "createServerCache", new Object[]{new Boolean(true)});
        this.client1.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints");
        this.client2.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints");
        this.client1.invoke(HADispatcherDUnitTest.class, "createClientCache", new Object[]{HADispatcherDUnitTest.getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), new Boolean(false)});
        this.client2.invoke(HADispatcherDUnitTest.class, "createClientCache", new Object[]{HADispatcherDUnitTest.getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), new Boolean(true)});
    }

    @Override
    public void tearDown2() throws Exception {
        super.tearDown2();
        this.client1.invoke(HADispatcherDUnitTest.class, "closeCache");
        this.client2.invoke(HADispatcherDUnitTest.class, "closeCache");
        this.server1.invoke(HADispatcherDUnitTest.class, "resetQRMslow");
        this.server1.invoke(HADispatcherDUnitTest.class, "closeCache");
        this.server2.invoke(HADispatcherDUnitTest.class, "closeCache");
    }

    public static void closeCache() {
        if (cache != null && !cache.isClosed()) {
            cache.close();
            cache.getDistributedSystem().disconnect();
        }
    }

    public static void setQRMslow() {
        int oldMessageSyncInterval = cache.getMessageSyncInterval();
        cache.setMessageSyncInterval(6);
        try {
            Thread.sleep((oldMessageSyncInterval + 1) * 1000);
        }
        catch (InterruptedException e) {
            HADispatcherDUnitTest.fail((String)"Unexcepted InterruptedException Occurred");
        }
    }

    public static void resetQRMslow() {
        cache.setMessageSyncInterval(1);
    }

    public static void makeDispatcherSlow() {
        System.setProperty("slowStartTimeForTesting", "5000");
    }

    private void clientPut(VM vm, final Object key, final Object value) {
        vm.invoke(new CacheSerializableRunnable("putFromClient"){

            @Override
            public void run2() throws CacheException {
                Region region = cache.getRegion("/HADispatcherDUnitTest_region");
                TestCase.assertNotNull((Object)region);
                region.put(key, value);
            }
        });
    }

    private void checkFromClient(VM vm) {
        vm.invoke(new CacheSerializableRunnable("checkFromClient"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run2() throws CacheException {
                Region region = cache.getRegion("/HADispatcherDUnitTest_region");
                TestCase.assertNotNull((Object)region);
                cache.getLogger().fine("starting the wait");
                Object object = dummyObj;
                synchronized (object) {
                    while (waitFlag) {
                        try {
                            dummyObj.wait(30000L);
                        }
                        catch (InterruptedException e) {
                            TestCase.fail((String)"interrupted");
                        }
                    }
                }
                cache.getLogger().fine("wait over...waitFlag=" + waitFlag);
                if (waitFlag) {
                    TestCase.fail((String)"test failed");
                }
            }
        });
    }

    private void checkFromServer(VM vm, Object key) {
        vm.invoke(new CacheSerializableRunnable("checkFromServer"){

            @Override
            public void run2() throws CacheException {
                Iterator iter = cache.getBridgeServers().iterator();
                BridgeServerImpl server = (BridgeServerImpl)iter.next();
                Iterator iter_prox = server.getAcceptor().getCacheClientNotifier().getClientProxies().iterator();
                isObjectPresent = false;
                while (iter_prox.hasNext()) {
                    final CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
                    HARegion region = (HARegion)proxy.getHARegion();
                    TestCase.assertNotNull((Object)region);
                    final HARegionQueue regionQueue = region.getOwner();
                    DistributedTestCase.WaitCriterion wc = new DistributedTestCase.WaitCriterion(){

                        @Override
                        public boolean done() {
                            int sz = regionQueue.size();
                            cache.getLogger().fine("regionQueue.size()::" + sz);
                            return sz == 0 || !proxy.isConnected();
                        }

                        @Override
                        public String description() {
                            int size = regionQueue.size();
                            try {
                                return "regionQueue not empty with entries " + regionQueue.peek(size) + " for proxy " + proxy;
                            }
                            catch (InterruptedException ie) {
                                return "regionQueue not empty with size " + size + " for proxy " + proxy;
                            }
                        }
                    };
                    DistributedTestCase.waitForCriterion(wc, 60000L, 1000L, true);
                    cache.getLogger().fine("processed a proxy");
                }
            }
        });
    }

    public void testDispatcher() throws Exception {
        this.clientPut(this.client1, KEY1, VALUE1);
        this.checkFromClient(this.client2);
        this.checkFromServer(this.server2, KEY1);
        this.clientPut(this.client1, KEY2, VALUE2);
        this.checkFromClient(this.client2);
        this.checkFromServer(this.server2, KEY2);
        Log.getLogWriter().info("testDispatcher() completed successfully");
    }

    public void ClientUpdateMessageSerialization() throws Exception {
        this.clientPut(this.client1, KEY1, VALUE1);
        Log.getLogWriter().fine(">>>>>>>> after clientPut(c1, k1, v1)");
        this.checkFromClient(this.client2);
        Log.getLogWriter().fine("after checkFromClient(c2)");
        this.checkFromServer(this.server2, KEY1);
        Log.getLogWriter().fine("after checkFromServer(s2, k1)");
        this.clientPut(this.client1, KEY1, VALUE1);
        Log.getLogWriter().fine("after clientPut 2 (c1, k1, v1)");
        this.checkFromClient(this.client2);
        Log.getLogWriter().fine("after checkFromClient 2 (c2)");
        this.checkFromServer(this.server2, KEY1);
        Log.getLogWriter().fine("after checkFromServer 2 (s2, k1)");
        Log.getLogWriter().info("testClientUpdateMessageSerialization() completed successfully");
    }

    private void createCache(Properties props) throws Exception {
        InternalDistributedSystem ds = this.getSystem(props);
        HADispatcherDUnitTest.assertNotNull((Object)ds);
        ds.disconnect();
        ds = this.getSystem(props);
        cache = CacheFactory.create((DistributedSystem)ds);
        HADispatcherDUnitTest.assertNotNull((Object)cache);
    }

    public static Integer createServerCache(Boolean isListenerPresent) throws Exception {
        new HADispatcherDUnitTest("temp").createCache(new Properties());
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_ACK);
        factory.setDataPolicy(DataPolicy.REPLICATE);
        if (isListenerPresent.booleanValue()) {
            HAServerListener serverListener = new HAServerListener();
            factory.setCacheListener((CacheListener)serverListener);
        }
        RegionAttributes attrs = factory.create();
        cache.createRegion(REGION_NAME, attrs);
        BridgeServerImpl server = (BridgeServerImpl)cache.addBridgeServer();
        HADispatcherDUnitTest.assertNotNull((Object)server);
        int port = AvailablePort.getRandomAvailablePort((int)0);
        server.setPort(port);
        server.setNotifyBySubscription(true);
        server.start();
        return new Integer(server.getPort());
    }

    public static void createClientCache(String hostName, Integer port1, Integer port2, Boolean isListenerPresent) throws Exception {
        PORT1 = port1;
        PORT2 = port2;
        Properties props = new Properties();
        props.setProperty("mcast-port", "0");
        props.setProperty("locators", "");
        new HADispatcherDUnitTest("temp").createCache(props);
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_ACK);
        BridgeTestCase.configureConnectionPool(factory, hostName, new int[]{PORT1, PORT2}, true, -1, 2, null);
        if (isListenerPresent.booleanValue()) {
            HAClientListener clientListener = new HAClientListener();
            factory.setCacheListener((CacheListener)clientListener);
        }
        RegionAttributes attrs = factory.create();
        cache.createRegion(REGION_NAME, attrs);
        Region region = cache.getRegion("/HADispatcherDUnitTest_region");
        HADispatcherDUnitTest.assertNotNull((Object)region);
        LocalRegion lr = (LocalRegion)region;
        final PoolImpl pool = (PoolImpl)lr.getServerProxy().getPool();
        DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

            @Override
            public boolean done() {
                return pool.getPrimary() != null;
            }

            @Override
            public String description() {
                return null;
            }
        };
        DistributedTestCase.waitForCriterion(ev, 30000L, 200L, true);
        ev = new DistributedTestCase.WaitCriterion(){

            @Override
            public boolean done() {
                return pool.getRedundants().size() >= 1;
            }

            @Override
            public String description() {
                return null;
            }
        };
        DistributedTestCase.waitForCriterion(ev, 30000L, 200L, true);
        HADispatcherDUnitTest.assertNotNull((Object)pool.getPrimary());
        HADispatcherDUnitTest.assertTrue((String)("backups=" + pool.getRedundants() + " expected=" + 1), (pool.getRedundants().size() >= 1 ? 1 : 0) != 0);
        HADispatcherDUnitTest.assertEquals((int)PORT1, (int)pool.getPrimaryPort());
        region.registerInterest((Object)KEY1);
        HADispatcherDUnitTest.createCQ();
    }

    private static void createCQ() {
        QueryService cqService = null;
        try {
            cqService = cache.getQueryService();
        }
        catch (Exception cqe) {
            cqe.printStackTrace();
            HADispatcherDUnitTest.fail((String)"Failed to getCQService.");
        }
        CqAttributesFactory cqf = new CqAttributesFactory();
        CqListener[] cqListeners = new CqListener[]{new CqQueryTestListener(HADispatcherDUnitTest.getLogWriter())};
        cqf.initCqListeners(cqListeners);
        CqAttributes cqa = cqf.create();
        String cqName = "CQForHARegionQueueTest";
        String queryStr = "Select * from /HADispatcherDUnitTest_region";
        try {
            CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
            cq1.execute();
        }
        catch (Exception ex) {
            HADispatcherDUnitTest.getLogWriter().info("CQService is :" + cqService);
            ex.printStackTrace();
            AssertionError err = new AssertionError((Object)("Failed to create/execute CQ " + cqName + " . "));
            ((Throwable)((Object)err)).initCause(ex);
            throw err;
        }
    }

    static {
        cache = null;
        dummyObj = "dummyObject";
        isObjectPresent = false;
        waitFlag = true;
    }

    protected static class HAServerListener
    extends CacheListenerAdapter {
        protected HAServerListener() {
        }

        public void afterCreate(EntryEvent event) {
            Cache cache = event.getRegion().getCache();
            Iterator iter = cache.getBridgeServers().iterator();
            BridgeServerImpl server = (BridgeServerImpl)iter.next();
            isObjectPresent = false;
            while (true) {
                for (CacheClientProxy proxy : server.getAcceptor().getCacheClientNotifier().getClientProxies()) {
                    HARegion regionForQueue = (HARegion)proxy.getHARegion();
                    for (Object obj : regionForQueue.values()) {
                        Conflatable confObj;
                        if (!(obj instanceof HAEventWrapper) || !HADispatcherDUnitTest.KEY1.equals((confObj = (Conflatable)obj).getKeyToConflate()) && !HADispatcherDUnitTest.KEY2.equals(confObj.getKeyToConflate())) continue;
                        isObjectPresent = true;
                    }
                }
                if (isObjectPresent) break;
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException ex) {
                    TestCase.fail((String)"interrupted");
                }
            }
        }
    }

    protected static class HAClientListener
    extends AbstractListener
    implements CacheListener,
    Declarable {
        protected HAClientListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void afterCreate(EntryEvent event) {
            Object object = dummyObj;
            synchronized (object) {
                try {
                    Object value = event.getNewValue();
                    if (value.equals(HADispatcherDUnitTest.VALUE1)) {
                        waitFlag = false;
                        dummyObj.notifyAll();
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        public void afterUpdate(EntryEvent event) {
        }

        public void afterInvalidate(EntryEvent event) {
        }

        public void afterDestroy(EntryEvent event) {
        }

        public void afterRegionInvalidate(RegionEvent event) {
        }

        public void afterRegionDestroy(RegionEvent event) {
        }

        public void close() {
        }

        public void init(Properties props) {
        }

        public void afterRegionCreate(RegionEvent event) {
        }

        public void afterRegionClear(RegionEvent event) {
        }

        public void afterRegionLive(RegionEvent event) {
        }
    }
}

