/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.tier.sockets;

import com.gemstone.gemfire.DeltaTestImpl;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.GemFireCache;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
import com.gemstone.gemfire.cache.query.CqEvent;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache.util.CqListenerAdapter;
import com.gemstone.gemfire.cache30.BridgeTestCase;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.BridgeServerImpl;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
import java.util.Properties;

public class DeltaPropagationWithCQDUnitTest
extends DistributedTestCase {
    private static GemFireCache cache = null;
    private static Pool pool = null;
    private static String regionName = "CQWithInterestDUnitTest_region";
    protected VM server1 = null;
    protected VM server2 = null;
    protected VM client1 = null;
    protected VM client2 = null;
    private static final String CQ1 = "SELECT * FROM /" + regionName;
    private static long totalEvents = 0L;
    private static long cqEvents = 0L;
    private static long cqErrors = 0L;

    public DeltaPropagationWithCQDUnitTest(String name) {
        super(name);
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        Host host = Host.getHost(0);
        this.server1 = host.getVM(0);
        this.server2 = host.getVM(1);
        this.client1 = host.getVM(2);
        this.client2 = host.getVM(3);
    }

    @Override
    public void tearDown2() throws Exception {
        super.tearDown2();
        this.server1.invoke(DeltaPropagationWithCQDUnitTest.class, "close");
        this.server2.invoke(DeltaPropagationWithCQDUnitTest.class, "close");
        this.client1.invoke(DeltaPropagationWithCQDUnitTest.class, "close");
        this.client2.invoke(DeltaPropagationWithCQDUnitTest.class, "close");
        DeltaPropagationWithCQDUnitTest.close();
    }

    public static void close() throws Exception {
        if (cache != null && !cache.isClosed()) {
            cache.close();
        }
        totalEvents = 0L;
        cqEvents = 0L;
        cqErrors = 0L;
    }

    public void testCqWithRI() throws Exception {
        int port = (Integer)this.server1.invoke(DeltaPropagationWithCQDUnitTest.class, "createCacheServer");
        this.client1.invoke(DeltaPropagationWithCQDUnitTest.class, "createClientCache", new Object[]{DeltaPropagationWithCQDUnitTest.getServerHostName(this.server1.getHost()), port, Boolean.TRUE});
        DeltaPropagationWithCQDUnitTest.createClientCache(DeltaPropagationWithCQDUnitTest.getServerHostName(this.server1.getHost()), port, true);
        DeltaPropagationWithCQDUnitTest.registerCQs(1, "CQWithInterestDUnitTest_cq");
        this.client1.invoke(DeltaPropagationWithCQDUnitTest.class, "doPut", new Object[]{"SAMPLE_KEY", "SAMPLE_VALUE"});
        this.client1.invoke(DeltaPropagationWithCQDUnitTest.class, "doPut", new Object[]{"SAMPLE_KEY", "NEW_VALUE"});
        DistributedTestCase.WaitCriterion wc = new DistributedTestCase.WaitCriterion(){

            @Override
            public boolean done() {
                return cqEvents == 2L && cqErrors == 0L;
            }

            @Override
            public String description() {
                return "Expected 2 cqEvents and 0 cqErrors, but found " + cqEvents + " cqEvents and " + cqErrors + " cqErrors";
            }
        };
        DistributedTestCase.waitForCriterion(wc, 30000L, 100L, true);
        DeltaPropagationWithCQDUnitTest.assertEquals((String)"Latest value: ", (Object)"NEW_VALUE", (Object)cache.getRegion(regionName).get((Object)"SAMPLE_KEY"));
    }

    public void testFullValueRequestsWithCqWithoutRI() throws Exception {
        int numOfListeners = 5;
        int numOfKeys = 10;
        int numOfCQs = 3;
        int port = (Integer)this.server1.invoke(DeltaPropagationWithCQDUnitTest.class, "createCacheServer");
        this.client1.invoke(DeltaPropagationWithCQDUnitTest.class, "createClientCache", new Object[]{DeltaPropagationWithCQDUnitTest.getServerHostName(this.server1.getHost()), port, Boolean.TRUE});
        DeltaPropagationWithCQDUnitTest.createClientCache(DeltaPropagationWithCQDUnitTest.getServerHostName(this.server1.getHost()), port, false);
        for (int i = 0; i < numOfCQs; ++i) {
            DeltaPropagationWithCQDUnitTest.registerCQs(numOfListeners, "Query_" + i);
        }
        DeltaPropagationWithCQDUnitTest.doPuts(numOfKeys, true);
        DeltaPropagationWithCQDUnitTest.verifyCqListeners(numOfListeners * numOfKeys * numOfCQs);
        this.server1.invoke(DeltaPropagationWithCQDUnitTest.class, "verifyFullValueRequestsFromClients", new Object[]{0L});
        this.client1.invoke(DeltaPropagationWithCQDUnitTest.class, "doPuts", new Object[]{numOfKeys, true});
        DeltaPropagationWithCQDUnitTest.verifyCqListeners(numOfListeners * numOfKeys * numOfCQs * 2);
        this.server1.invoke(DeltaPropagationWithCQDUnitTest.class, "verifyFullValueRequestsFromClients", new Object[]{10L});
    }

    public static void verifyCqListeners(final Integer events) throws Exception {
        DistributedTestCase.WaitCriterion wc = new DistributedTestCase.WaitCriterion(){

            @Override
            public String description() {
                return "Expected " + events + " listener invocations but found " + (cqEvents + cqErrors);
            }

            @Override
            public boolean done() {
                return cqEvents + cqErrors == (long)events.intValue();
            }
        };
        DistributedTestCase.waitForCriterion(wc, 10000L, 100L, true);
    }

    public static void verifyFullValueRequestsFromClients(Long expected) throws Exception {
        Object[] proxies = ((BridgeServerImpl)((GemFireCacheImpl)cache).getCacheServers().get(0)).getAcceptor().getCacheClientNotifier().getClientProxies().toArray();
        long fullValueRequests = ((CacheClientProxy)proxies[0]).getStatistics().getDeltaFullMessagesSent();
        if (fullValueRequests == 0L) {
            DeltaPropagationWithCQDUnitTest.assertEquals((String)"Full value requests, ", (long)expected, (long)((CacheClientProxy)proxies[1]).getStatistics().getDeltaFullMessagesSent());
        } else {
            DeltaPropagationWithCQDUnitTest.assertEquals((String)"Full value requests, ", (long)expected, (long)fullValueRequests);
        }
    }

    public static void doPut(Object key, Object value) throws Exception {
        Region region = cache.getRegion(regionName);
        region.put(key, value);
    }

    public static void doPuts(Integer num, Boolean useDelta) throws Exception {
        Region region = cache.getRegion(regionName);
        for (int i = 0; i < num; ++i) {
            if (useDelta.booleanValue()) {
                DeltaTestImpl delta = new DeltaTestImpl(i, "VALUE_" + i);
                delta.setIntVar(i);
                region.put((Object)("KEY_" + i), (Object)delta);
                continue;
            }
            region.put((Object)("KEY_" + i), (Object)i);
        }
    }

    public static Integer createCacheServer() throws Exception {
        DeltaPropagationWithCQDUnitTest instance = new DeltaPropagationWithCQDUnitTest("temp");
        Properties props = new Properties();
        InternalDistributedSystem ds = instance.getSystem(props);
        ds.disconnect();
        ds = instance.getSystem(props);
        DeltaPropagationWithCQDUnitTest.assertNotNull((Object)ds);
        cache = CacheFactory.create((DistributedSystem)ds);
        DeltaPropagationWithCQDUnitTest.assertNotNull((Object)cache);
        RegionFactory rf = ((Cache)cache).createRegionFactory(RegionShortcut.REPLICATE);
        rf.create(regionName);
        CacheServer server = ((Cache)cache).addCacheServer();
        server.setPort(AvailablePort.getRandomAvailablePort((int)0));
        server.start();
        return server.getPort();
    }

    public static void createClientCache(String host, Integer port, Boolean doRI) throws Exception {
        Properties props = new Properties();
        props.setProperty("mcast-port", "0");
        props.setProperty("locators", "");
        DeltaPropagationWithCQDUnitTest instance = new DeltaPropagationWithCQDUnitTest("temp");
        InternalDistributedSystem ds = instance.getSystem(props);
        ds.disconnect();
        ds = instance.getSystem(props);
        DeltaPropagationWithCQDUnitTest.assertNotNull((Object)ds);
        cache = CacheFactory.create((DistributedSystem)ds);
        DeltaPropagationWithCQDUnitTest.assertNotNull((Object)cache);
        AttributesFactory factory = new AttributesFactory();
        pool = BridgeTestCase.configureConnectionPool(factory, "localhost", new int[]{port}, true, 1, 2, null, 1000, 250, false, -2);
        factory.setScope(Scope.LOCAL);
        factory.addCacheListener((CacheListener)new CacheListenerAdapter<Object, Object>(){

            public void afterCreate(EntryEvent<Object, Object> event) {
                totalEvents++;
            }

            public void afterUpdate(EntryEvent<Object, Object> event) {
                totalEvents++;
            }

            public void afterDestroy(EntryEvent<Object, Object> event) {
                totalEvents++;
            }

            public void afterInvalidate(EntryEvent<Object, Object> event) {
                totalEvents++;
            }
        });
        RegionAttributes attr = factory.create();
        Region region = ((Cache)cache).createRegion(regionName, attr);
        if (doRI.booleanValue()) {
            region.registerInterest((Object)"ALL_KEYS");
        }
    }

    public static void registerCQs(Integer numOfListeners, String name) throws Exception {
        QueryService qs = pool.getQueryService();
        CqAttributesFactory caf = new CqAttributesFactory();
        CqListenerAdapter[] cqListeners = new CqListenerAdapter[numOfListeners.intValue()];
        for (int i = 0; i < numOfListeners; ++i) {
            cqListeners[i] = new CqListenerAdapter(){

                public void onEvent(CqEvent event) {
                    event.getNewValue();
                    cqEvents++;
                }

                public void onError(CqEvent event) {
                    event.getNewValue();
                    cqErrors++;
                }
            };
            caf.addCqListener((CqListener)cqListeners[i]);
        }
        CqQuery cQuery = qs.newCq(name, CQ1, caf.create());
        cQuery.execute();
        if (qs.getCq(name) == null) {
            DeltaPropagationWithCQDUnitTest.fail((String)("Failed to get CQ " + name));
        }
    }
}

