/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.topic;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.proxy.ClientReliableTopicProxy;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.topic.impl.reliable.DurableSubscriptionTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastParallelClassRunner.class)
@Category(value={QuickTest.class, ParallelTest.class})
public class ClientReliableTopicOnClusterRestartTest {
    private final TestHazelcastFactory hazelcastFactory = new TestHazelcastFactory();

    @After
    public void tearDown() {
        this.hazelcastFactory.terminateAll();
    }

    @Test
    public void serverRestartWhenReliableTopicListenerRegistered() {
        HazelcastInstance server = this.hazelcastFactory.newHazelcastInstance();
        String topicName = "topic";
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getNetworkConfig().setConnectionAttemptLimit(Integer.MAX_VALUE);
        HazelcastInstance hazelcastClient = this.hazelcastFactory.newHazelcastClient(clientConfig);
        HazelcastInstance hazelcastClient2 = this.hazelcastFactory.newHazelcastClient(clientConfig);
        ITopic topic = hazelcastClient.getReliableTopic(topicName);
        ITopic topic2 = hazelcastClient2.getReliableTopic(topicName);
        final CountDownLatch listenerLatch = new CountDownLatch(1);
        topic.addMessageListener((MessageListener)new MessageListener<Integer>(){

            public void onMessage(Message<Integer> message) {
                listenerLatch.countDown();
            }
        });
        server.getLifecycleService().terminate();
        this.hazelcastFactory.newHazelcastInstance();
        topic2.publish((Object)5);
        HazelcastTestSupport.assertOpenEventually((CountDownLatch)listenerLatch);
    }

    @Test
    public void shouldContinue_OnClusterRestart_afterInvocationTimeout() throws InterruptedException {
        HazelcastInstance member = this.hazelcastFactory.newHazelcastInstance();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getNetworkConfig().setConnectionAttemptLimit(100);
        int invocationTimeoutSeconds = 2;
        clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), String.valueOf(invocationTimeoutSeconds));
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient(clientConfig);
        final CountDownLatch messageArrived = new CountDownLatch(1);
        String topicName = "topic";
        ITopic topic = client.getReliableTopic(topicName);
        String registrationId = topic.addMessageListener((MessageListener)new DurableSubscriptionTest.DurableMessageListener<String>(){

            public void onMessage(Message<String> message) {
                messageArrived.countDown();
            }

            public boolean isLossTolerant() {
                return true;
            }
        });
        member.shutdown();
        Thread.sleep(TimeUnit.SECONDS.toMillis(invocationTimeoutSeconds));
        member = this.hazelcastFactory.newHazelcastInstance();
        member.getReliableTopic(topicName).publish((Object)"message");
        HazelcastTestSupport.assertOpenEventually((CountDownLatch)messageArrived);
        ClientReliableTopicProxy proxy = (ClientReliableTopicProxy)topic;
        Assert.assertFalse((boolean)proxy.isListenerCancelled(registrationId));
    }

    @Test
    public void shouldContinue_OnClusterRestart_whenDataLoss_LossTolerant_afterInvocationTimeout() throws InterruptedException {
        HazelcastInstance member = this.hazelcastFactory.newHazelcastInstance();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getNetworkConfig().setConnectionAttemptLimit(100);
        int invocationTimeoutSeconds = 2;
        clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), String.valueOf(invocationTimeoutSeconds));
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient(clientConfig);
        final AtomicLong messageCount = new AtomicLong();
        final CountDownLatch messageArrived = new CountDownLatch(1);
        String topicName = "topic";
        member.getReliableTopic(topicName).publish((Object)"message");
        member.getReliableTopic(topicName).publish((Object)"message");
        ITopic topic = client.getReliableTopic(topicName);
        String registrationId = topic.addMessageListener((MessageListener)new DurableSubscriptionTest.DurableMessageListener<String>(){

            public void onMessage(Message<String> message) {
                messageCount.incrementAndGet();
                messageArrived.countDown();
            }

            public boolean isLossTolerant() {
                return true;
            }
        });
        member.shutdown();
        Thread.sleep(TimeUnit.SECONDS.toMillis(invocationTimeoutSeconds));
        member = this.hazelcastFactory.newHazelcastInstance();
        member.getReliableTopic(topicName).publish((Object)"message");
        HazelcastTestSupport.assertOpenEventually((CountDownLatch)messageArrived);
        ClientReliableTopicProxy proxy = (ClientReliableTopicProxy)topic;
        Assert.assertFalse((boolean)proxy.isListenerCancelled(registrationId));
        TestCase.assertEquals((long)1L, (long)messageCount.get());
    }

    @Test
    public void shouldFail_OnClusterRestart_whenDataLoss_notLossTolerant() {
        HazelcastInstance member = this.hazelcastFactory.newHazelcastInstance();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getNetworkConfig().setConnectionAttemptLimit(100);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient(clientConfig);
        final AtomicLong messageCount = new AtomicLong();
        String topicName = "topic";
        member.getReliableTopic(topicName).publish((Object)"message");
        member.getReliableTopic(topicName).publish((Object)"message");
        final ITopic topic = client.getReliableTopic(topicName);
        final String registrationId = topic.addMessageListener((MessageListener)new DurableSubscriptionTest.DurableMessageListener<String>(){

            public void onMessage(Message<String> message) {
                messageCount.incrementAndGet();
            }

            public boolean isLossTolerant() {
                return false;
            }
        });
        member.shutdown();
        this.hazelcastFactory.newHazelcastInstance();
        HazelcastTestSupport.assertTrueEventually((AssertTask)new AssertTask(){

            public void run() {
                ClientReliableTopicProxy proxy = (ClientReliableTopicProxy)topic;
                TestCase.assertTrue((boolean)proxy.isListenerCancelled(registrationId));
            }
        });
        TestCase.assertEquals((long)0L, (long)messageCount.get());
    }
}

