/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.map.impl.querycache;

import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.querycache.ClientQueryCacheContext;
import com.hazelcast.client.impl.querycache.subscriber.ClientQueryCacheEventService;
import com.hazelcast.client.impl.querycache.subscriber.QueryCacheToListenerMapper;
import com.hazelcast.client.proxy.ClientMapProxy;
import com.hazelcast.client.spi.ClientContext;
import com.hazelcast.client.spi.ProxyManager;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.Config;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.map.QueryCache;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.querycache.QueryCacheContext;
import com.hazelcast.map.impl.querycache.QueryCacheEventService;
import com.hazelcast.map.impl.querycache.accumulator.DefaultAccumulatorInfoSupplier;
import com.hazelcast.map.impl.querycache.publisher.MapListenerRegistry;
import com.hazelcast.map.impl.querycache.publisher.MapPublisherRegistry;
import com.hazelcast.map.impl.querycache.publisher.PublisherContext;
import com.hazelcast.map.impl.querycache.publisher.PublisherRegistry;
import com.hazelcast.map.impl.querycache.publisher.QueryCacheListenerRegistry;
import com.hazelcast.map.impl.querycache.subscriber.QueryCacheEndToEndProvider;
import com.hazelcast.map.impl.querycache.subscriber.QueryCacheFactory;
import com.hazelcast.map.impl.querycache.subscriber.SubscriberContext;
import com.hazelcast.map.listener.EntryAddedListener;
import com.hazelcast.map.listener.MapListener;
import com.hazelcast.query.Predicate;
import com.hazelcast.query.TruePredicate;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.eventservice.impl.EventServiceImpl;
import com.hazelcast.spi.impl.eventservice.impl.EventServiceSegment;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.util.RandomPicker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastParallelClassRunner.class)
@Category(value={QuickTest.class, ParallelTest.class})
public class ClientQueryCacheMemoryLeakTest
extends HazelcastTestSupport {
    private static final int STRESS_TEST_RUN_SECONDS = 3;
    private static final int STRESS_TEST_THREAD_COUNT = 4;
    private TestHazelcastFactory factory = new TestHazelcastFactory();

    @After
    public void tearDown() throws Exception {
        this.factory.shutdownAll();
    }

    @Test
    public void stress_user_listener_removal_upon_query_cache_destroy() throws InterruptedException {
        final String[] mapNames = new String[]{"mapA", "mapB", "mapC", "mapD"};
        Config config = this.getConfig();
        HazelcastInstance node1 = this.factory.newHazelcastInstance(config);
        HazelcastInstance node2 = this.factory.newHazelcastInstance(config);
        HazelcastInstance node3 = this.factory.newHazelcastInstance(config);
        final HazelcastInstance client = this.factory.newHazelcastClient();
        final AtomicBoolean stop = new AtomicBoolean(false);
        ArrayList<1> threads = new ArrayList<1>();
        for (int i = 0; i < 4; ++i) {
            Thread thread = new Thread(){

                @Override
                public void run() {
                    while (!stop.get()) {
                        String name = mapNames[RandomPicker.getInt((int)0, (int)4)];
                        IMap map = client.getMap(name);
                        int key = RandomPicker.getInt((int)0, (int)Integer.MAX_VALUE);
                        map.put((Object)key, (Object)1);
                        QueryCache queryCache = map.getQueryCache(name, (Predicate)TruePredicate.INSTANCE, true);
                        queryCache.get((Object)key);
                        queryCache.addEntryListener((MapListener)new EntryAddedListener<Integer, Integer>(){

                            public void entryAdded(EntryEvent<Integer, Integer> event) {
                            }
                        }, true);
                        queryCache.destroy();
                        map.destroy();
                    }
                }
            };
            threads.add(thread);
        }
        for (Thread thread : threads) {
            thread.start();
        }
        ClientQueryCacheMemoryLeakTest.sleepSeconds((int)3);
        stop.set(true);
        for (Thread thread : threads) {
            thread.join();
        }
        ClientQueryCacheMemoryLeakTest.assertNoUserListenerLeft(node1);
        ClientQueryCacheMemoryLeakTest.assertNoUserListenerLeft(node2);
        ClientQueryCacheMemoryLeakTest.assertNoUserListenerLeft(node3);
        for (String mapName : mapNames) {
            ClientQueryCacheMemoryLeakTest.assertNoUserListenerLeft(mapName, client);
        }
        Collection instances = this.factory.getAllHazelcastInstances();
        for (HazelcastInstance instance : instances) {
            ClientQueryCacheMemoryLeakTest.assertServerSideEventServiceCleared(instance);
        }
    }

    @Test
    public void event_service_is_empty_after_queryCache_destroy() {
        String mapName = "test";
        HazelcastInstance node1 = this.factory.newHazelcastInstance();
        HazelcastInstance node2 = this.factory.newHazelcastInstance();
        HazelcastInstance client = this.factory.newHazelcastClient();
        IMap map = client.getMap(mapName);
        QueryCache queryCache = map.getQueryCache(mapName, (Predicate)TruePredicate.INSTANCE, true);
        queryCache.destroy();
        ClientQueryCacheMemoryLeakTest.assertNoUserListenerLeft(node1);
        ClientQueryCacheMemoryLeakTest.assertNoUserListenerLeft(node2);
    }

    @Test
    public void event_service_is_empty_after_queryCache_concurrent_destroy() throws InterruptedException {
        HazelcastInstance node1 = this.factory.newHazelcastInstance();
        HazelcastInstance node2 = this.factory.newHazelcastInstance();
        String mapName = "test";
        HazelcastInstance client = this.factory.newHazelcastClient();
        final IMap map = client.getMap(mapName);
        ClientQueryCacheMemoryLeakTest.populateMap((IMap<Integer, Integer>)map);
        final AtomicBoolean stop = new AtomicBoolean(false);
        ArrayList<2> threads = new ArrayList<2>();
        for (int i = 0; i < 4; ++i) {
            Thread thread = new Thread(){

                @Override
                public void run() {
                    while (!stop.get()) {
                        QueryCache queryCache = map.getQueryCache("a", (Predicate)TruePredicate.INSTANCE, true);
                        queryCache.addEntryListener((MapListener)new EntryAddedListener<Integer, Integer>(){

                            public void entryAdded(EntryEvent<Integer, Integer> event) {
                            }
                        }, true);
                        queryCache.destroy();
                    }
                }
            };
            threads.add(thread);
        }
        for (Thread thread : threads) {
            thread.start();
        }
        ClientQueryCacheMemoryLeakTest.sleepSeconds((int)3);
        stop.set(true);
        for (Thread thread : threads) {
            thread.join();
        }
        ClientQueryCacheMemoryLeakTest.assertNoUserListenerLeft(node1);
        ClientQueryCacheMemoryLeakTest.assertNoUserListenerLeft(node2);
        ClientQueryCacheMemoryLeakTest.assertNoUserListenerLeft(mapName, client);
    }

    @Test
    public void removes_internal_query_caches_upon_map_destroy() {
        this.factory.newHazelcastInstance();
        HazelcastInstance client = this.factory.newHazelcastClient();
        String mapName = "test";
        IMap map = client.getMap(mapName);
        ClientQueryCacheMemoryLeakTest.populateMap((IMap<Integer, Integer>)map);
        for (int j = 0; j < 10; ++j) {
            map.getQueryCache(j + "-test-QC", (Predicate)TruePredicate.INSTANCE, true);
        }
        map.destroy();
        ClientQueryCacheContext queryCacheContext = ((ClientMapProxy)map).getQueryCacheContext();
        SubscriberContext subscriberContext = queryCacheContext.getSubscriberContext();
        QueryCacheEndToEndProvider provider = subscriberContext.getEndToEndQueryCacheProvider();
        QueryCacheFactory queryCacheFactory = subscriberContext.getQueryCacheFactory();
        Assert.assertEquals((long)0L, (long)provider.getQueryCacheCount(mapName));
        Assert.assertEquals((long)0L, (long)queryCacheFactory.getQueryCacheCount());
    }

    @Test
    public void no_query_cache_left_after_creating_and_destroying_same_map_concurrently() throws Exception {
        HazelcastInstance node = this.factory.newHazelcastInstance();
        final HazelcastInstance client = this.factory.newHazelcastClient();
        String mapName = "test";
        ExecutorService pool = Executors.newFixedThreadPool(4);
        final AtomicBoolean stop = new AtomicBoolean(false);
        for (int i = 0; i < 1000; ++i) {
            Runnable runnable = new Runnable(){

                @Override
                public void run() {
                    while (!stop.get()) {
                        IMap map = client.getMap("test");
                        try {
                            ClientQueryCacheMemoryLeakTest.populateMap((IMap<Integer, Integer>)map);
                            for (int j = 0; j < 10; ++j) {
                                map.getQueryCache(j + "-test-QC", (Predicate)TruePredicate.INSTANCE, true);
                            }
                        }
                        finally {
                            map.destroy();
                        }
                    }
                }
            };
            pool.submit(runnable);
        }
        ClientQueryCacheMemoryLeakTest.sleepSeconds((int)3);
        stop.set(true);
        pool.shutdown();
        pool.awaitTermination(120L, TimeUnit.SECONDS);
        SubscriberContext subscriberContext = ClientQueryCacheMemoryLeakTest.getSubscriberContext(client, "test");
        final QueryCacheEndToEndProvider provider = subscriberContext.getEndToEndQueryCacheProvider();
        final QueryCacheFactory queryCacheFactory = subscriberContext.getQueryCacheFactory();
        ClientQueryCacheMemoryLeakTest.assertTrueEventually((AssertTask)new AssertTask(){

            public void run() {
                Assert.assertEquals((long)0L, (long)provider.getQueryCacheCount("test"));
            }
        });
        ClientQueryCacheMemoryLeakTest.assertTrueEventually((AssertTask)new AssertTask(){

            public void run() {
                Assert.assertEquals((long)0L, (long)queryCacheFactory.getQueryCacheCount());
            }
        });
        ClientQueryCacheMemoryLeakTest.assertNoListenerLeftOnEventService(node);
        ClientQueryCacheMemoryLeakTest.assertNoRegisteredListenerLeft(node, "test");
        ClientQueryCacheMemoryLeakTest.assertNoAccumulatorInfoSupplierLeft(node, "test");
        ClientQueryCacheMemoryLeakTest.assertNoPartitionAccumulatorRegistryLeft(node, "test");
    }

    private static void assertNoAccumulatorInfoSupplierLeft(HazelcastInstance node, String mapName) {
        PublisherContext publisherContext = ClientQueryCacheMemoryLeakTest.getPublisherContext(node);
        DefaultAccumulatorInfoSupplier accumulatorInfoSupplier = (DefaultAccumulatorInfoSupplier)publisherContext.getAccumulatorInfoSupplier();
        int accumulatorInfoCountOfMap = accumulatorInfoSupplier.accumulatorInfoCountOfMap(mapName);
        Assert.assertEquals((long)0L, (long)accumulatorInfoCountOfMap);
    }

    private static void assertNoRegisteredListenerLeft(HazelcastInstance node, String mapName) {
        PublisherContext publisherContext = ClientQueryCacheMemoryLeakTest.getPublisherContext(node);
        MapListenerRegistry mapListenerRegistry = publisherContext.getMapListenerRegistry();
        QueryCacheListenerRegistry registry = mapListenerRegistry.getOrNull(mapName);
        if (registry != null) {
            Map registeredListeners = registry.getAll();
            Assert.assertTrue((boolean)registeredListeners.isEmpty());
        }
    }

    private static void assertNoPartitionAccumulatorRegistryLeft(HazelcastInstance node, String mapName) {
        PublisherContext publisherContext = ClientQueryCacheMemoryLeakTest.getPublisherContext(node);
        MapPublisherRegistry mapPublisherRegistry = publisherContext.getMapPublisherRegistry();
        PublisherRegistry registry = mapPublisherRegistry.getOrCreate(mapName);
        if (registry == null) {
            return;
        }
        Map accumulatorRegistryMap = registry.getAll();
        Assert.assertTrue((boolean)accumulatorRegistryMap.isEmpty());
    }

    private static void assertNoListenerLeftOnEventService(HazelcastInstance node) {
        NodeEngineImpl nodeEngineImpl = ClientQueryCacheMemoryLeakTest.getNodeEngineImpl((HazelcastInstance)node);
        EventServiceImpl eventService = (EventServiceImpl)nodeEngineImpl.getEventService();
        EventServiceSegment segment = eventService.getSegment("hz:impl:mapService", false);
        ConcurrentMap registrationIdMap = segment.getRegistrationIdMap();
        Assert.assertEquals((String)registrationIdMap.toString(), (long)0L, (long)registrationIdMap.size());
    }

    private static void populateMap(IMap<Integer, Integer> map) {
        for (int i = 0; i < 10; ++i) {
            map.put((Object)i, (Object)i);
        }
    }

    private static SubscriberContext getSubscriberContext(HazelcastInstance client, String mapName) {
        IMap map = client.getMap(mapName);
        return ((ClientMapProxy)map).getQueryCacheContext().getSubscriberContext();
    }

    private static PublisherContext getPublisherContext(HazelcastInstance node) {
        MapService mapService = (MapService)ClientQueryCacheMemoryLeakTest.getNodeEngineImpl((HazelcastInstance)node).getService("hz:impl:mapService");
        MapServiceContext mapServiceContext = mapService.getMapServiceContext();
        QueryCacheContext queryCacheContext = mapServiceContext.getQueryCacheContext();
        return queryCacheContext.getPublisherContext();
    }

    private static void assertNoUserListenerLeft(HazelcastInstance node) {
        NodeEngineImpl nodeEngineImpl = ClientQueryCacheMemoryLeakTest.getNodeEngineImpl((HazelcastInstance)node);
        EventServiceImpl eventServiceImpl = (EventServiceImpl)nodeEngineImpl.getEventService();
        EventServiceSegment segment = eventServiceImpl.getSegment("hz:impl:mapService", false);
        ConcurrentMap registrationIdMap = segment.getRegistrationIdMap();
        Assert.assertTrue((String)registrationIdMap.toString(), (boolean)registrationIdMap.isEmpty());
    }

    private static void assertServerSideEventServiceCleared(HazelcastInstance node) {
        NodeEngineImpl nodeEngineImpl = ClientQueryCacheMemoryLeakTest.getNodeEngineImpl((HazelcastInstance)node);
        EventServiceImpl eventServiceImpl = (EventServiceImpl)nodeEngineImpl.getEventService();
        EventServiceSegment segment = eventServiceImpl.getSegment("hz:impl:mapService", false);
        ConcurrentMap registrationIdMap = segment.getRegistrationIdMap();
        Assert.assertTrue((String)registrationIdMap.toString(), (boolean)registrationIdMap.isEmpty());
    }

    private static void assertNoUserListenerLeft(String mapName, HazelcastInstance client) {
        ProxyManager proxyManager = ((HazelcastClientProxy)client).client.getProxyManager();
        ClientContext context = proxyManager.getContext();
        ClientQueryCacheContext queryCacheContext = context.getQueryCacheContext();
        SubscriberContext subscriberContext = queryCacheContext.getSubscriberContext();
        QueryCacheEventService eventService = subscriberContext.getEventService();
        Assert.assertFalse((boolean)ClientQueryCacheMemoryLeakTest.hasAnyListenerRegistered(eventService, mapName));
    }

    private static boolean hasAnyListenerRegistered(QueryCacheEventService eventService, String mapName) {
        ConcurrentMap registrations = ((ClientQueryCacheEventService)eventService).getRegistrations();
        QueryCacheToListenerMapper queryCacheToListenerMapper = (QueryCacheToListenerMapper)registrations.get(mapName);
        return queryCacheToListenerMapper != null && queryCacheToListenerMapper.hasAnyQueryCacheRegistered();
    }
}

