/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.ha;

import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.BridgeObserver;
import com.gemstone.gemfire.internal.cache.BridgeObserverAdapter;
import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
import java.net.SocketException;
import java.util.Properties;

public class HASlowReceiverDUnitTest
extends DistributedTestCase {
    protected static Cache cache = null;
    private static VM serverVM1 = null;
    protected static VM serverVM2 = null;
    protected static VM clientVM = null;
    private static int PORT0;
    private static int PORT1;
    private static int PORT2;
    private static final String regionName = "HASlowReceiverDUnitTest";
    protected static LogWriter logger;
    static PoolImpl pool;
    private static boolean isUnresponsiveClientRemoved;

    public HASlowReceiverDUnitTest(String name) {
        super(name);
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        Host host = Host.getHost(0);
        serverVM1 = host.getVM(1);
        serverVM2 = host.getVM(2);
        clientVM = host.getVM(3);
        PORT0 = HASlowReceiverDUnitTest.createServerCache();
        PORT1 = (Integer)serverVM1.invoke(HASlowReceiverDUnitTest.class, "createServerCache");
        PORT2 = (Integer)serverVM2.invoke(HASlowReceiverDUnitTest.class, "createServerCache");
    }

    @Override
    public void tearDown2() throws Exception {
        super.tearDown2();
        clientVM.invoke(HASlowReceiverDUnitTest.class, "closeCache");
        HASlowReceiverDUnitTest.closeCache();
        serverVM1.invoke(HASlowReceiverDUnitTest.class, "closeCache");
        serverVM2.invoke(HASlowReceiverDUnitTest.class, "closeCache");
        HASlowReceiverDUnitTest.disconnectAllFromDS();
    }

    private void createCache(Properties props) throws Exception {
        InternalDistributedSystem ds = this.getSystem(props);
        ds.disconnect();
        ds = this.getSystem(props);
        HASlowReceiverDUnitTest.assertNotNull((Object)ds);
        cache = CacheFactory.create((DistributedSystem)ds);
        HASlowReceiverDUnitTest.assertNotNull((Object)cache);
    }

    public static Integer createServerCache() throws Exception {
        return HASlowReceiverDUnitTest.createServerCache(null);
    }

    public static Integer createServerCache(String ePolicy) throws Exception {
        return HASlowReceiverDUnitTest.createServerCache(ePolicy, new Integer(1));
    }

    public static Integer createServerCache(String ePolicy, Integer cap) throws Exception {
        Properties prop = new Properties();
        prop.setProperty("remove-unresponsive-client", "true");
        new HASlowReceiverDUnitTest("temp").createCache(prop);
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_ACK);
        factory.setDataPolicy(DataPolicy.REPLICATE);
        RegionAttributes attrs = factory.create();
        cache.createRegion(regionName, attrs);
        logger = cache.getLogger();
        int port = AvailablePort.getRandomAvailablePort((int)0);
        BridgeServer server1 = cache.addBridgeServer();
        server1.setPort(port);
        server1.setNotifyBySubscription(true);
        server1.setMaximumMessageCount(200);
        if (ePolicy != null) {
            server1.getClientSubscriptionConfig().setEvictionPolicy(ePolicy);
            server1.getClientSubscriptionConfig().setCapacity(cap.intValue());
        }
        server1.start();
        return new Integer(server1.getPort());
    }

    public static void createClientCache(String host, Integer port1, Integer port2, Integer port3, Integer rLevel, Boolean addListener) throws Exception {
        CacheServerTestUtil.disableShufflingOfEndpoints();
        Properties props = new Properties();
        props.setProperty("mcast-port", "0");
        props.setProperty("locators", "");
        new HASlowReceiverDUnitTest("temp").createCache(props);
        AttributesFactory factory = new AttributesFactory();
        PoolImpl p = (PoolImpl)PoolManager.createFactory().addServer("localhost", port1.intValue()).addServer("localhost", port2.intValue()).addServer("localhost", port3.intValue()).setSubscriptionEnabled(true).setSubscriptionRedundancy(rLevel.intValue()).setThreadLocalConnections(true).setMinConnections(6).setReadTimeout(20000).setPingInterval(1000L).setRetryAttempts(5).create("HASlowRecieverDUnitTestPool");
        factory.setScope(Scope.LOCAL);
        factory.setPoolName(p.getName());
        if (addListener.booleanValue()) {
            factory.addCacheListener((CacheListener)new CacheListenerAdapter(){

                public void afterUpdate(EntryEvent event) {
                    if (event.getNewValue().equals("v20")) {
                        try {
                            Thread.sleep(120000L);
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        RegionAttributes attrs = factory.create();
        cache.createRegion(regionName, attrs);
        pool = p;
    }

    public static void createClientCache(String host, Integer port1, Integer port2, Integer port3, Integer rLevel) throws Exception {
        HASlowReceiverDUnitTest.createClientCache(host, port1, port2, port3, rLevel, Boolean.TRUE);
    }

    public static void registerInterest() {
        try {
            Region r = cache.getRegion("/HASlowReceiverDUnitTest");
            HASlowReceiverDUnitTest.assertNotNull((Object)r);
            r.registerInterest((Object)"ALL_KEYS");
        }
        catch (Exception ex) {
            HASlowReceiverDUnitTest.fail("failed in registerInterestListAll", ex);
        }
    }

    public static void putEntries() {
        try {
            Region r = cache.getRegion("/HASlowReceiverDUnitTest");
            HASlowReceiverDUnitTest.assertNotNull((Object)r);
            for (long i = 0L; i < 300L; ++i) {
                r.put((Object)("k" + i % 10L), (Object)("v" + i));
                r.put((Object)("k" + i % 10L), (Object)new byte[1000]);
            }
        }
        catch (Exception ex) {
            HASlowReceiverDUnitTest.fail("failed in putEntries()", ex);
        }
    }

    public static void createEntries(Long num) {
        try {
            Region r = cache.getRegion("/HASlowReceiverDUnitTest");
            HASlowReceiverDUnitTest.assertNotNull((Object)r);
            for (long i = 0L; i < num; ++i) {
                r.create((Object)("k" + i), (Object)("v" + i));
            }
        }
        catch (Exception ex) {
            HASlowReceiverDUnitTest.fail("failed in createEntries(Long)", ex);
        }
    }

    public static void checkRedundancyLevel(final Integer redundantServers) {
        DistributedTestCase.WaitCriterion wc = new DistributedTestCase.WaitCriterion(){

            @Override
            public boolean done() {
                return pool.getRedundantNames().size() == redundantServers.intValue();
            }

            @Override
            public String description() {
                return "Expected redundant count (" + pool.getRedundantNames().size() + ") to become " + redundantServers;
            }
        };
        DistributedTestCase.waitForCriterion(wc, 200000L, 1000L, true);
    }

    public void testSlowClient() throws Exception {
        HASlowReceiverDUnitTest.setBridgeObeserverForAfterQueueDestroyMessage();
        clientVM.invoke(HASlowReceiverDUnitTest.class, "createClientCache", new Object[]{HASlowReceiverDUnitTest.getServerHostName(Host.getHost(0)), new Integer(PORT0), new Integer(PORT1), new Integer(PORT2), new Integer(2)});
        clientVM.invoke(HASlowReceiverDUnitTest.class, "registerInterest");
        DistributedTestCase.ExpectedException ex1 = HASlowReceiverDUnitTest.addExpectedException(SocketException.class.getName());
        DistributedTestCase.ExpectedException ex2 = HASlowReceiverDUnitTest.addExpectedException(InterruptedException.class.getName());
        HASlowReceiverDUnitTest.putEntries();
        Thread.sleep(20000L);
        clientVM.invoke(HASlowReceiverDUnitTest.class, "checkRedundancyLevel", new Object[]{new Integer(2)});
        HASlowReceiverDUnitTest.assertTrue((String)"isUnresponsiveClientRemoved is false, but should be true after 20 seconds", (boolean)isUnresponsiveClientRemoved);
        ex1.remove();
        ex2.remove();
    }

    public static void setBridgeObeserverForAfterQueueDestroyMessage() throws Exception {
        PoolImpl.AFTER_QUEUE_DESTROY_MESSAGE_FLAG = true;
        BridgeObserverHolder.setInstance((BridgeObserver)new BridgeObserverAdapter(){

            public void afterQueueDestroyMessage() {
                clientVM.invoke(HASlowReceiverDUnitTest.class, "checkRedundancyLevel", new Object[]{new Integer(0)});
                isUnresponsiveClientRemoved = true;
                PoolImpl.AFTER_QUEUE_DESTROY_MESSAGE_FLAG = false;
            }
        });
    }

    public static void closeCache() {
        if (cache != null && !cache.isClosed()) {
            cache.close();
            cache.getDistributedSystem().disconnect();
        }
    }

    static {
        logger = null;
        pool = null;
        isUnresponsiveClientRemoved = false;
    }
}

