/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.QueueBrowser;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.JMSClientTestSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JMSMessageConsumerTest
extends JMSClientTestSupport {
    protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Test
    @Timeout(value=60L)
    public void testSelectorOnTopic() throws Exception {
        this.doTestSelector(true);
    }

    @Test
    @Timeout(value=60L)
    public void testSelectorOnQueue() throws Exception {
        this.doTestSelector(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTestSelector(boolean topic) throws Exception {
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            Object destination = null;
            destination = topic ? session.createTopic(this.getTopicName()) : session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)destination);
            MessageConsumer messageConsumer = session.createConsumer((Destination)destination, "color = 'RED'");
            TextMessage message = session.createTextMessage();
            message.setText("msg:0");
            producer.send((Message)message);
            message = session.createTextMessage();
            message.setText("msg:1");
            message.setStringProperty("color", "RED");
            producer.send((Message)message);
            connection.start();
            TextMessage m = (TextMessage)messageConsumer.receive(5000L);
            Assertions.assertNotNull((Object)m);
            Assertions.assertEquals((Object)"msg:1", (Object)m.getText());
            Assertions.assertEquals((Object)m.getStringProperty("color"), (Object)"RED");
        }
    }

    @Test
    @Timeout(value=30L)
    public void testSelectorsWithJMSTypeOnTopic() throws Exception {
        this.doTestSelectorsWithJMSType(true);
    }

    @Test
    @Timeout(value=30L)
    public void testSelectorsWithJMSTypeOnQueue() throws Exception {
        this.doTestSelectorsWithJMSType(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTestSelectorsWithJMSType(boolean topic) throws Exception {
        String type = "myJMSType";
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            Object destination = null;
            destination = topic ? session.createTopic(this.getTopicName()) : session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)destination);
            MessageConsumer consumer = session.createConsumer((Destination)destination, "JMSType = 'myJMSType'");
            TextMessage message1 = session.createTextMessage();
            message1.setText("text");
            producer.send((Message)message1, 1, 4, 0L);
            TextMessage message2 = session.createTextMessage();
            message2.setJMSType("myJMSType");
            message2.setText("text + type");
            producer.send((Message)message2, 1, 4, 0L);
            connection.start();
            Message msg = consumer.receive(2000L);
            Assertions.assertNotNull((Object)msg);
            Assertions.assertTrue((boolean)(msg instanceof TextMessage));
            Assertions.assertEquals((Object)"myJMSType", (Object)msg.getJMSType(), (String)"Unexpected JMSType value");
            Assertions.assertEquals((Object)"text + type", (Object)((TextMessage)msg).getText(), (String)"Unexpected message content");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=30L)
    public void testSelectorsWithJMSCorrelationID() throws Exception {
        String correlationID = UUID.randomUUID().toString();
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            jakarta.jms.Queue queue = session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)queue);
            TextMessage message1 = session.createTextMessage();
            message1.setText("text");
            producer.send((Message)message1);
            TextMessage message2 = session.createTextMessage();
            message2.setJMSCorrelationID(correlationID);
            message2.setText("JMSCorrelationID");
            producer.send((Message)message2);
            QueueBrowser browser = session.createBrowser(queue);
            Enumeration enumeration = browser.getEnumeration();
            int count = 0;
            while (enumeration.hasMoreElements()) {
                Message m = (Message)enumeration.nextElement();
                Assertions.assertTrue((boolean)(m instanceof TextMessage));
                ++count;
            }
            Assertions.assertEquals((int)2, (int)count);
            MessageConsumer consumer = session.createConsumer((Destination)queue, "JMSCorrelationID = '" + correlationID + "'");
            Message msg = consumer.receive(2000L);
            Assertions.assertNotNull((Object)msg);
            Assertions.assertTrue((boolean)(msg instanceof TextMessage));
            Assertions.assertEquals((Object)correlationID, (Object)msg.getJMSCorrelationID(), (String)"Unexpected JMSCorrelationID value");
            Assertions.assertEquals((Object)"JMSCorrelationID", (Object)((TextMessage)msg).getText(), (String)"Unexpected message content");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=30L)
    public void testSelectorsWithJMSPriority() throws Exception {
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            jakarta.jms.Queue queue = session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)queue);
            TextMessage message = session.createTextMessage();
            message.setText("hello");
            producer.send((Message)message, 2, 5, 0L);
            message = session.createTextMessage();
            message.setText("hello + 9");
            producer.send((Message)message, 2, 9, 0L);
            QueueBrowser browser = session.createBrowser(queue);
            Enumeration enumeration = browser.getEnumeration();
            int count = 0;
            while (enumeration.hasMoreElements()) {
                Message m = (Message)enumeration.nextElement();
                Assertions.assertTrue((boolean)(m instanceof TextMessage));
                ++count;
            }
            Assertions.assertEquals((int)2, (int)count);
            MessageConsumer consumer = session.createConsumer((Destination)queue, "JMSPriority > 8");
            Message msg = consumer.receive(2000L);
            Assertions.assertNotNull((Object)msg);
            Assertions.assertTrue((boolean)(msg instanceof TextMessage));
            Assertions.assertEquals((Object)"hello + 9", (Object)((TextMessage)msg).getText());
        }
    }

    @Test
    @Timeout(value=30L)
    public void testSelectorsWithJMSXGroupIDOnTopic() throws Exception {
        this.doTestSelectorsWithJMSXGroupID(true);
    }

    @Test
    @Timeout(value=30L)
    public void testSelectorsWithJMSXGroupIDOnQueue() throws Exception {
        this.doTestSelectorsWithJMSXGroupID(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTestSelectorsWithJMSXGroupID(boolean topic) throws Exception {
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            Object destination = null;
            destination = topic ? session.createTopic(this.getTopicName()) : session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)destination);
            MessageConsumer consumer = session.createConsumer((Destination)destination, "JMSXGroupID = '1'");
            TextMessage message = session.createTextMessage();
            message.setText("group 1 - 1");
            message.setStringProperty("JMSXGroupID", "1");
            message.setIntProperty("JMSXGroupSeq", 1);
            producer.send((Message)message);
            message = session.createTextMessage();
            message.setText("group 2");
            message.setStringProperty("JMSXGroupID", "2");
            producer.send((Message)message);
            message = session.createTextMessage();
            message.setText("group 1 - 2");
            message.setStringProperty("JMSXGroupID", "1");
            message.setIntProperty("JMSXGroupSeq", -1);
            producer.send((Message)message);
            connection.start();
            Message msg = consumer.receive(2000L);
            Assertions.assertNotNull((Object)msg);
            Assertions.assertTrue((boolean)(msg instanceof TextMessage));
            Assertions.assertEquals((Object)"group 1 - 1", (Object)((TextMessage)msg).getText());
            msg = consumer.receive(2000L);
            Assertions.assertNotNull((Object)msg);
            Assertions.assertTrue((boolean)(msg instanceof TextMessage));
            Assertions.assertEquals((Object)"group 1 - 2", (Object)((TextMessage)msg).getText());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=30L)
    public void testSelectorsWithJMSDeliveryOnQueue() throws Exception {
        String selector = "JMSDeliveryMode = 'PERSISTENT'";
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            jakarta.jms.Queue destination = session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)destination);
            MessageConsumer consumer = session.createConsumer((Destination)destination, selector);
            TextMessage message1 = session.createTextMessage();
            message1.setText("non-persistent");
            producer.send((Message)message1, 1, 4, 0L);
            TextMessage message2 = session.createTextMessage();
            message2.setText("persistent");
            producer.send((Message)message2, 2, 4, 0L);
            connection.start();
            Message msg = consumer.receive(2000L);
            Assertions.assertNotNull((Object)msg);
            Assertions.assertTrue((boolean)(msg instanceof TextMessage));
            Assertions.assertEquals((int)2, (int)msg.getJMSDeliveryMode(), (String)"Unexpected JMSDeliveryMode value");
            Assertions.assertEquals((Object)"persistent", (Object)((TextMessage)msg).getText(), (String)"Unexpected message content");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=30L)
    public void testSelectorsWithJMSTimestampOnQueue() throws Exception {
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            jakarta.jms.Queue destination = session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)destination);
            TextMessage message1 = session.createTextMessage();
            message1.setText("filtered");
            producer.send((Message)message1, 2, 4, 0L);
            Thread.sleep(2L);
            TextMessage message2 = session.createTextMessage();
            message2.setText("expected");
            producer.send((Message)message2, 2, 4, 0L);
            MessageConsumer consumer = session.createConsumer((Destination)destination, "JMSTimestamp = " + message2.getJMSTimestamp());
            connection.start();
            Message msg = consumer.receive(2000L);
            Assertions.assertNotNull((Object)msg);
            Assertions.assertTrue((boolean)(msg instanceof TextMessage));
            Assertions.assertEquals((long)message2.getJMSTimestamp(), (long)msg.getJMSTimestamp(), (String)"Unexpected JMSTimestamp value");
            Assertions.assertEquals((Object)"expected", (Object)((TextMessage)msg).getText(), (String)"Unexpected message content");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=30L)
    public void testSelectorsWithJMSExpirationOnQueue() throws Exception {
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            jakarta.jms.Queue destination = session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)destination);
            TextMessage message1 = session.createTextMessage();
            message1.setText("filtered");
            producer.send((Message)message1, 2, 4, 0L);
            TextMessage message2 = session.createTextMessage();
            message2.setText("expected");
            producer.send((Message)message2, 2, 4, 60000L);
            MessageConsumer consumer = session.createConsumer((Destination)destination, "JMSExpiration = " + message2.getJMSExpiration());
            connection.start();
            Message msg = consumer.receive(2000L);
            Assertions.assertNotNull((Object)msg);
            Assertions.assertTrue((boolean)(msg instanceof TextMessage));
            Assertions.assertEquals((long)message2.getJMSExpiration(), (long)msg.getJMSExpiration(), (String)"Unexpected JMSExpiration value");
            Assertions.assertEquals((Object)"expected", (Object)((TextMessage)msg).getText(), (String)"Unexpected message content");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testJMSSelectorFiltersJMSMessageIDOnTopic() throws Exception {
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            jakarta.jms.Queue queue = session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)queue);
            TextMessage message = session.createTextMessage();
            producer.send((Message)message);
            producer.send((Message)session.createTextMessage());
            connection.start();
            MessageConsumer messageConsumer = session.createConsumer((Destination)queue, "JMSMessageID = '" + message.getJMSMessageID() + "'");
            TextMessage m = (TextMessage)messageConsumer.receive(5000L);
            Assertions.assertNotNull((Object)m);
            Assertions.assertEquals((Object)message.getJMSMessageID(), (Object)m.getJMSMessageID());
            Assertions.assertNull((Object)messageConsumer.receive(1000L));
        }
    }

    @Test
    @Timeout(value=60L)
    public void testZeroPrefetchWithTwoConsumers() throws Exception {
        JmsConnection connection = (JmsConnection)this.createConnection();
        ((JmsDefaultPrefetchPolicy)connection.getPrefetchPolicy()).setAll(0);
        connection.start();
        Session session = connection.createSession(false, 1);
        jakarta.jms.Queue queue = session.createQueue(this.getQueueName());
        MessageProducer producer = session.createProducer((Destination)queue);
        producer.send((Message)session.createTextMessage("Msg1"));
        producer.send((Message)session.createTextMessage("Msg2"));
        MessageConsumer consumer1 = session.createConsumer((Destination)queue);
        MessageConsumer consumer2 = session.createConsumer((Destination)queue);
        TextMessage answer = (TextMessage)consumer1.receive(5000L);
        Assertions.assertNotNull((Object)answer);
        Assertions.assertEquals((Object)answer.getText(), (Object)"Msg1", (String)"Should have received a message!");
        answer = (TextMessage)consumer2.receive(5000L);
        Assertions.assertNotNull((Object)answer);
        Assertions.assertEquals((Object)answer.getText(), (Object)"Msg2", (String)"Should have received a message!");
        answer = (TextMessage)consumer2.receiveNoWait();
        Assertions.assertNull((Object)answer, (String)"Should have not received a message!");
    }

    @Test
    @Timeout(value=30L)
    public void testProduceAndConsumeLargeNumbersOfTopicMessagesClientAck() throws Exception {
        this.doTestProduceAndConsumeLargeNumbersOfMessages(true, 2);
    }

    @Test
    @Timeout(value=30L)
    public void testProduceAndConsumeLargeNumbersOfQueueMessagesClientAck() throws Exception {
        this.doTestProduceAndConsumeLargeNumbersOfMessages(false, 2);
    }

    @Test
    @Timeout(value=30L)
    public void testProduceAndConsumeLargeNumbersOfTopicMessagesAutoAck() throws Exception {
        this.doTestProduceAndConsumeLargeNumbersOfMessages(true, 1);
    }

    @Test
    @Timeout(value=30L)
    public void testProduceAndConsumeLargeNumbersOfQueueMessagesAutoAck() throws Exception {
        this.doTestProduceAndConsumeLargeNumbersOfMessages(false, 1);
    }

    public void doTestProduceAndConsumeLargeNumbersOfMessages(boolean topic, int ackMode) throws Exception {
        int MSG_COUNT = 1000;
        CountDownLatch done = new CountDownLatch(1000);
        JmsConnection connection = (JmsConnection)this.createConnection();
        connection.setForceAsyncSend(true);
        connection.start();
        Session session = connection.createSession(false, ackMode);
        Object destination = topic ? session.createTopic(this.getTopicName()) : session.createQueue(this.getQueueName());
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        consumer.setMessageListener(message -> {
            try {
                message.acknowledge();
                done.countDown();
            }
            catch (JMSException ex) {
                logger.debug("Caught exception.", (Throwable)ex);
            }
        });
        MessageProducer producer = session.createProducer((Destination)destination);
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("messageText");
        for (int i = 0; i < 1000; ++i) {
            producer.send((Message)textMessage);
        }
        Assertions.assertTrue((boolean)done.await(15L, TimeUnit.SECONDS), (String)"Did not receive all messages: 1000");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testPrefetchedMessagesAreNotConsumedOnConsumerClose() throws Exception {
        int NUM_MESSAGES = 10;
        try (Connection connection = this.createConnection();){
            Session session = connection.createSession(false, 1);
            jakarta.jms.Queue queue = session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)queue);
            byte[] bytes = new byte[2048];
            new Random().nextBytes(bytes);
            for (int i = 0; i < 10; ++i) {
                TextMessage message = session.createTextMessage();
                message.setText("msg:" + i);
                producer.send((Message)message);
            }
            connection.close();
            Queue queueView = this.getProxyToQueue(this.getQueueName());
            Wait.assertEquals((long)10L, () -> ((Queue)queueView).getMessageCount());
            connection = this.createConnection();
            session = connection.createSession(false, 1);
            MessageConsumer consumer = session.createConsumer((Destination)queue);
            Thread.sleep(100L);
            consumer.close();
            connection.close();
            Wait.assertEquals((long)10L, () -> ((Queue)queueView).getMessageCount());
        }
    }

    @Test
    @Timeout(value=60L)
    public void testMessagesReceivedInParallel() throws Throwable {
        int numMessages = 50000;
        long time = System.currentTimeMillis();
        ArrayList exceptions = new ArrayList();
        Thread t = new Thread(() -> {
            Connection connectionConsumer = null;
            try {
                connectionConsumer = this.createConnection();
                connectionConsumer.start();
                Session sessionConsumer = connectionConsumer.createSession(false, 1);
                jakarta.jms.Queue queue = sessionConsumer.createQueue(this.getQueueName());
                MessageConsumer consumer = sessionConsumer.createConsumer((Destination)queue);
                long n = 0L;
                for (int count = 50000; count > 0; --count) {
                    try {
                        if (++n % 1000L == 0L) {
                            logger.debug("received {} messages", (Object)n);
                        }
                        Message m = consumer.receive(5000L);
                        Assertions.assertNotNull((Object)m, (String)("Could not receive message count=" + count + " on consumer"));
                        continue;
                    }
                    catch (JMSException e) {
                        e.printStackTrace();
                        break;
                    }
                }
            }
            catch (Throwable e) {
                exceptions.add(e);
                e.printStackTrace();
            }
            finally {
                try {
                    connectionConsumer.close();
                }
                catch (Throwable sessionConsumer) {}
            }
        });
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        jakarta.jms.Queue queue = session.createQueue(this.getQueueName());
        t.start();
        MessageProducer p = session.createProducer((Destination)queue);
        p.setDeliveryMode(1);
        for (int i = 0; i < 50000; ++i) {
            BytesMessage message = session.createBytesMessage();
            message.writeUTF("Hello world!!!!" + i);
            message.setIntProperty("count", i);
            p.send((Message)message);
        }
        t.join();
        if (!exceptions.isEmpty()) {
            throw (Throwable)exceptions.get(0);
        }
        Queue queueView = this.getProxyToQueue(this.getQueueName());
        connection.close();
        Wait.assertEquals((long)0L, () -> ((Queue)queueView).getMessageCount());
        long taken = System.currentTimeMillis() - time;
        logger.debug("Microbenchamrk ran in {} milliseconds, sending/receiving {}", (Object)taken, (Object)50000);
        double messagesPerSecond = 50000.0 / (double)taken * 1000.0;
        logger.debug("{} messages per second", (Object)((int)messagesPerSecond));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testClientAckMessages() throws Exception {
        int numMessages = 10;
        try (Connection connection = this.createConnection();){
            long time = System.currentTimeMillis();
            Session session = connection.createSession(false, 1);
            jakarta.jms.Queue queue = session.createQueue(this.getQueueName());
            MessageProducer producer = session.createProducer((Destination)queue);
            byte[] bytes = new byte[2048];
            new Random().nextBytes(bytes);
            for (int i = 0; i < 10; ++i) {
                TextMessage message = session.createTextMessage();
                message.setText("msg:" + i);
                producer.send((Message)message);
            }
            connection.close();
            Queue queueView = this.getProxyToQueue(this.getQueueName());
            Wait.assertEquals((long)10L, () -> ((Queue)queueView).getMessageCount());
            connection = this.createConnection();
            session = connection.createSession(false, 2);
            MessageConsumer consumer = session.createConsumer((Destination)queue);
            for (int i = 0; i < 10; ++i) {
                Message msg = consumer.receive(5000L);
                Assertions.assertNotNull((Object)msg, (String)("" + i));
                Assertions.assertTrue((boolean)(msg instanceof TextMessage), (String)("" + msg));
                String text = ((TextMessage)msg).getText();
                Assertions.assertEquals((Object)text, (Object)("msg:" + i));
                msg.acknowledge();
            }
            consumer.close();
            connection.close();
            Wait.assertEquals((long)0L, () -> ((Queue)queueView).getMessageCount());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=30L)
    public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
        String name = "exampleQueue1";
        this.server.getAddressSettingsRepository().addMatch("#", (Object)new AddressSettings().setAutoDeleteQueues(Boolean.valueOf(false)));
        int numMessages = 40;
        try (Connection connection = this.createConnection();){
            int i;
            Session session = connection.createSession(false, 1);
            jakarta.jms.Queue queue = session.createQueue(name);
            MessageProducer producer = session.createProducer((Destination)queue);
            for (i = 0; i < 40; ++i) {
                TextMessage message = session.createTextMessage();
                message.setText("Message temporary");
                producer.send((Message)message);
            }
            producer.close();
            session.close();
            for (i = 0; i < 40; ++i) {
                session = connection.createSession(false, 1);
                queue = session.createQueue(name);
                MessageConsumer c = session.createConsumer((Destination)queue);
                Assertions.assertNotNull((Object)c.receive(1000L));
                session.close();
            }
            session = connection.createSession(false, 1);
            queue = session.createQueue(name);
            MessageConsumer c = session.createConsumer((Destination)queue);
            for (int i2 = 0; i2 < 40; ++i2) {
                Assertions.assertNull((Object)c.receive(1L));
            }
            producer.close();
            session.close();
        }
    }

    @Test
    public void testConcurrentSharedConsumerConnections() throws Exception {
        int concurrentConnections = 20;
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        AtomicBoolean failedToSubscribe = new AtomicBoolean(false);
        for (int i = 1; i < 20; ++i) {
            executorService.submit(() -> {
                try (Connection connection = this.createConnection();){
                    connection.start();
                    Session session = connection.createSession();
                    Topic topic = session.createTopic("topics.foo");
                    session.createSharedConsumer(topic, "MY_SUB");
                    Thread.sleep(100L);
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                    failedToSubscribe.set(true);
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(30L, TimeUnit.SECONDS);
        Assertions.assertFalse((boolean)failedToSubscribe.get());
    }

    @Test
    @Timeout(value=30L)
    public void testBrokerRestartAMQPProducerAMQPConsumer() throws Exception {
        Connection connection = this.createFailoverConnection();
        Connection connection2 = this.createFailoverConnection();
        this.testBrokerRestart(connection, connection2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBrokerRestart(Connection connection1, Connection connection2) throws Exception {
        try {
            Session session1 = connection1.createSession(false, 1);
            Session session2 = connection2.createSession(false, 1);
            jakarta.jms.Queue queue1 = session1.createQueue(this.getQueueName());
            jakarta.jms.Queue queue2 = session2.createQueue(this.getQueueName());
            MessageConsumer consumer2 = session2.createConsumer((Destination)queue2);
            MessageProducer producer = session1.createProducer((Destination)queue1);
            producer.setDeliveryMode(2);
            connection1.start();
            TextMessage message = session1.createTextMessage();
            message.setText("hello");
            producer.send((Message)message);
            Message received = consumer2.receive(100L);
            Assertions.assertNotNull((Object)received, (String)"Should have received a message by now.");
            Assertions.assertTrue((boolean)(received instanceof TextMessage), (String)"Should be an instance of TextMessage");
            Assertions.assertEquals((int)2, (int)received.getJMSDeliveryMode());
            this.server.stop();
            Wait.waitFor(() -> !this.server.isStarted(), (long)1000L);
            this.server.start();
            TextMessage message2 = session1.createTextMessage();
            message2.setText("hello");
            producer.send((Message)message2);
            Message received2 = consumer2.receive(100L);
            Assertions.assertNotNull((Object)received2, (String)"Should have received a message by now.");
            Assertions.assertTrue((boolean)(received2 instanceof TextMessage), (String)"Should be an instance of TextMessage");
            Assertions.assertEquals((int)2, (int)received2.getJMSDeliveryMode());
        }
        finally {
            connection1.close();
            connection2.close();
        }
    }
}

