/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.client;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionStopStartTest
extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private ActiveMQServer server;
    private final SimpleString QUEUE = SimpleString.of((String)"ConsumerTestQueue");
    private ServerLocator locator;

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.server = this.createServer(false);
        this.server.start();
        this.locator = this.createInVMNonHALocator();
    }

    @Test
    public void testStopStartConsumerSyncReceiveImmediate() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.QUEUE).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.QUEUE);
        int numMessages = 100;
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = this.createTextMessage(session, "m" + i);
            message.putIntProperty(SimpleString.of((String)"i"), i);
            producer.send((Message)message);
        }
        ClientConsumer consumer = session.createConsumer(this.QUEUE);
        session.start();
        for (int i = 0; i < 50; ++i) {
            ClientMessage cm = consumer.receive(5000L);
            Assertions.assertNotNull((Object)cm);
            cm.acknowledge();
        }
        session.stop();
        ClientMessage cm = consumer.receiveImmediate();
        Assertions.assertNull((Object)cm);
        session.start();
        for (int i = 0; i < 50; ++i) {
            cm = consumer.receive(5000L);
            Assertions.assertNotNull((Object)cm);
            cm.acknowledge();
        }
        session.close();
    }

    @Test
    public void testStopStartConsumerSyncReceive() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.QUEUE).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.QUEUE);
        int numMessages = 100;
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = this.createTextMessage(session, "m" + i);
            message.putIntProperty(SimpleString.of((String)"i"), i);
            producer.send((Message)message);
        }
        ClientConsumer consumer = session.createConsumer(this.QUEUE);
        session.start();
        for (int i = 0; i < 50; ++i) {
            ClientMessage cm = consumer.receive(5000L);
            Assertions.assertNotNull((Object)cm);
            cm.acknowledge();
        }
        session.stop();
        long time = System.currentTimeMillis();
        ClientMessage cm = consumer.receive(1000L);
        long taken = System.currentTimeMillis() - time;
        Assertions.assertTrue((taken >= 1000L ? 1 : 0) != 0);
        Assertions.assertNull((Object)cm);
        session.start();
        for (int i = 0; i < 50; ++i) {
            cm = consumer.receive(5000L);
            Assertions.assertNotNull((Object)cm);
            cm.acknowledge();
        }
        session.close();
    }

    @Test
    public void testStopStartConsumerAsyncSyncStoppedByHandler() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        final ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.QUEUE).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.QUEUE);
        int numMessages = 100;
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = this.createTextMessage(session, "m" + i);
            message.putIntProperty(SimpleString.of((String)"i"), i);
            producer.send((Message)message);
        }
        ClientConsumer consumer = session.createConsumer(this.QUEUE);
        session.start();
        final CountDownLatch latch = new CountDownLatch(10);
        class MyHandler
        implements MessageHandler {
            boolean failed;
            boolean started = true;
            int count = 0;

            MyHandler() {
            }

            public void onMessage(ClientMessage message) {
                try {
                    if (!this.started) {
                        this.failed = true;
                    }
                    ++this.count;
                    if (this.count == 10) {
                        message.acknowledge();
                        session.stop();
                        this.started = false;
                    }
                    latch.countDown();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
        MyHandler handler = new MyHandler();
        consumer.setMessageHandler((MessageHandler)handler);
        SessionStopStartTest.waitForLatch(latch);
        Assertions.assertFalse((boolean)handler.failed);
        Assertions.assertNull((Object)consumer.getLastException());
        consumer.setMessageHandler(null);
        session.start();
        for (int i = 0; i < 90; ++i) {
            ClientMessage msg = consumer.receive(1000L);
            Assertions.assertNotNull((Object)msg, (String)("message " + i));
            msg.acknowledge();
        }
        Assertions.assertNull((Object)consumer.receiveImmediate());
        session.close();
    }

    @Test
    public void testStopStartConsumerAsyncSync() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.QUEUE).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.QUEUE);
        int numMessages = 100;
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = this.createTextMessage(session, "m" + i);
            message.putIntProperty(SimpleString.of((String)"i"), i);
            producer.send((Message)message);
        }
        final ClientConsumer consumer = session.createConsumer(this.QUEUE);
        session.start();
        final CountDownLatch latch = new CountDownLatch(10);
        class MyHandler
        implements MessageHandler {
            boolean failed;
            boolean started = true;

            MyHandler() {
            }

            public void onMessage(ClientMessage message) {
                try {
                    if (!this.started) {
                        this.failed = true;
                    }
                    latch.countDown();
                    if (latch.getCount() == 0L) {
                        message.acknowledge();
                        this.started = false;
                        consumer.setMessageHandler(null);
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
        MyHandler handler = new MyHandler();
        consumer.setMessageHandler((MessageHandler)handler);
        SessionStopStartTest.waitForLatch(latch);
        try {
            session.stop();
        }
        catch (Exception e) {
            logger.warn(e.getMessage(), (Throwable)e);
            throw e;
        }
        Assertions.assertFalse((boolean)handler.failed);
        Assertions.assertNull((Object)consumer.getLastException());
        consumer.setMessageHandler(null);
        session.start();
        for (int i = 0; i < 90; ++i) {
            ClientMessage msg = consumer.receive(1000L);
            Assertions.assertNotNull((Object)msg, (String)("message " + i));
            msg.acknowledge();
        }
        Assertions.assertNull((Object)consumer.receiveImmediate());
        session.close();
    }

    @Test
    public void testStopStartConsumerAsyncASyncStoppeeByHandler() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        final ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.QUEUE).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.QUEUE);
        int numMessages = 100;
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = this.createTextMessage(session, "m" + i);
            message.putIntProperty(SimpleString.of((String)"i"), i);
            producer.send((Message)message);
        }
        ClientConsumer consumer = session.createConsumer(this.QUEUE);
        session.start();
        CountDownLatch latch = new CountDownLatch(10);
        class MyHandler
        implements MessageHandler {
            int messageReceived = 0;
            boolean failed;
            boolean started = true;
            private final CountDownLatch latch;
            private boolean stop = true;

            MyHandler(CountDownLatch latch) {
                this.latch = latch;
            }

            MyHandler(CountDownLatch latch, boolean stop) {
                this(latch);
                this.stop = stop;
            }

            public void onMessage(ClientMessage message) {
                try {
                    if (!this.started) {
                        this.failed = true;
                    }
                    ++this.messageReceived;
                    this.latch.countDown();
                    if (this.stop && this.latch.getCount() == 0L) {
                        message.acknowledge();
                        session.stop();
                        this.started = false;
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
        MyHandler handler = new MyHandler(latch);
        consumer.setMessageHandler((MessageHandler)handler);
        SessionStopStartTest.waitForLatch(latch);
        Thread.sleep(100L);
        Assertions.assertFalse((boolean)handler.failed);
        Assertions.assertNull((Object)consumer.getLastException());
        latch = new CountDownLatch(90);
        handler = new MyHandler(latch, false);
        consumer.setMessageHandler((MessageHandler)handler);
        session.start();
        Assertions.assertTrue((boolean)latch.await(5L, TimeUnit.SECONDS), (String)("message received " + handler.messageReceived));
        Thread.sleep(100L);
        Assertions.assertFalse((boolean)handler.failed);
        Assertions.assertNull((Object)consumer.getLastException());
        session.close();
    }

    @Test
    public void testStopStartConsumerAsyncASync() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.QUEUE).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.QUEUE);
        int numMessages = 100;
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = this.createTextMessage(session, "m" + i);
            message.putIntProperty(SimpleString.of((String)"i"), i);
            producer.send((Message)message);
        }
        final ClientConsumer consumer = session.createConsumer(this.QUEUE);
        session.start();
        CountDownLatch latch = new CountDownLatch(10);
        class MyHandler
        implements MessageHandler {
            int messageReceived = 0;
            boolean failed;
            boolean started = true;
            private final CountDownLatch latch;
            private boolean stop = true;

            MyHandler(CountDownLatch latch) {
                this.latch = latch;
            }

            MyHandler(CountDownLatch latch, boolean stop) {
                this(latch);
                this.stop = stop;
            }

            public void onMessage(ClientMessage message) {
                try {
                    if (!this.started) {
                        this.failed = true;
                    }
                    ++this.messageReceived;
                    this.latch.countDown();
                    if (this.stop && this.latch.getCount() == 0L) {
                        message.acknowledge();
                        consumer.setMessageHandler(null);
                        this.started = false;
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
        MyHandler handler = new MyHandler(latch);
        consumer.setMessageHandler((MessageHandler)handler);
        SessionStopStartTest.waitForLatch(latch);
        Thread.sleep(100L);
        Assertions.assertFalse((boolean)handler.failed);
        Assertions.assertNull((Object)consumer.getLastException());
        latch = new CountDownLatch(90);
        handler = new MyHandler(latch, false);
        consumer.setMessageHandler((MessageHandler)handler);
        session.start();
        Assertions.assertTrue((boolean)latch.await(5L, TimeUnit.SECONDS), (String)("message received " + handler.messageReceived));
        Thread.sleep(100L);
        Assertions.assertFalse((boolean)handler.failed);
        Assertions.assertNull((Object)consumer.getLastException());
        session.close();
    }

    private int getMessageEncodeSize(SimpleString address) throws Exception {
        ServerLocator locator = this.createInVMNonHALocator();
        ClientSessionFactory cf = this.createSessionFactory(locator);
        ClientSession session = cf.createSession(false, true, true);
        ClientMessage message = session.createMessage(false);
        message.setAddress(address);
        int encodeSize = message.getEncodeSize();
        session.close();
        cf.close();
        return encodeSize;
    }

    @Test
    public void testStopStartMultipleConsumers() throws Exception {
        this.locator.setConsumerWindowSize(this.getMessageEncodeSize(this.QUEUE) * 33);
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.QUEUE).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.QUEUE);
        int numMessages = 100;
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = this.createTextMessage(session, "m" + i);
            message.putIntProperty(SimpleString.of((String)"i"), i);
            producer.send((Message)message);
        }
        ClientConsumer consumer = session.createConsumer(this.QUEUE);
        ClientConsumer consumer2 = session.createConsumer(this.QUEUE);
        ClientConsumer consumer3 = session.createConsumer(this.QUEUE);
        session.start();
        ClientMessage cm = consumer.receive(5000L);
        Assertions.assertNotNull((Object)cm);
        cm.acknowledge();
        cm = consumer2.receive(5000L);
        Assertions.assertNotNull((Object)cm);
        cm.acknowledge();
        cm = consumer3.receive(5000L);
        Assertions.assertNotNull((Object)cm);
        cm.acknowledge();
        session.stop();
        cm = consumer.receiveImmediate();
        Assertions.assertNull((Object)cm);
        cm = consumer2.receiveImmediate();
        Assertions.assertNull((Object)cm);
        cm = consumer3.receiveImmediate();
        Assertions.assertNull((Object)cm);
        session.start();
        cm = consumer.receive(5000L);
        Assertions.assertNotNull((Object)cm);
        cm = consumer2.receive(5000L);
        Assertions.assertNotNull((Object)cm);
        cm = consumer3.receive(5000L);
        Assertions.assertNotNull((Object)cm);
        session.close();
    }

    @Test
    public void testStopStartAlreadyStartedSession() throws Exception {
        ClientMessage cm;
        int i;
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.QUEUE).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.QUEUE);
        int numMessages = 100;
        for (int i2 = 0; i2 < 100; ++i2) {
            ClientMessage message = this.createTextMessage(session, "m" + i2);
            message.putIntProperty(SimpleString.of((String)"i"), i2);
            producer.send((Message)message);
        }
        ClientConsumer consumer = session.createConsumer(this.QUEUE);
        session.start();
        for (i = 0; i < 50; ++i) {
            cm = consumer.receive(5000L);
            Assertions.assertNotNull((Object)cm);
            cm.acknowledge();
        }
        session.start();
        for (i = 0; i < 50; ++i) {
            cm = consumer.receive(5000L);
            Assertions.assertNotNull((Object)cm);
            cm.acknowledge();
        }
        session.close();
    }

    @Test
    public void testStopAlreadyStoppedSession() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.QUEUE).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.QUEUE);
        int numMessages = 100;
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = this.createTextMessage(session, "m" + i);
            message.putIntProperty(SimpleString.of((String)"i"), i);
            producer.send((Message)message);
        }
        ClientConsumer consumer = session.createConsumer(this.QUEUE);
        session.start();
        for (int i = 0; i < 50; ++i) {
            ClientMessage cm = consumer.receive(5000L);
            Assertions.assertNotNull((Object)cm);
            cm.acknowledge();
        }
        session.stop();
        ClientMessage cm = consumer.receiveImmediate();
        Assertions.assertNull((Object)cm);
        session.stop();
        cm = consumer.receiveImmediate();
        Assertions.assertNull((Object)cm);
        session.start();
        for (int i = 0; i < 50; ++i) {
            cm = consumer.receive(5000L);
            Assertions.assertNotNull((Object)cm);
            cm.acknowledge();
        }
        session.close();
    }
}

