/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ1936Test
extends TestCase {
    private static final Logger logger = LoggerFactory.getLogger(AMQ1936Test.class);
    private static final String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue";
    private static final long TEST_MESSAGE_COUNT = 6000L;
    private static final int CONSUMER_COUNT = 2;
    private static final boolean TRANSACTED_RECEIVE = true;
    private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2, Long.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    private final ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[2];
    private BrokerService broker = null;
    static QueueConnectionFactory connectionFactory = null;

    protected void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        this.broker.getSystemUsage().getMemoryUsage().setLimit(0x500000L);
        this.broker.setBrokerName("test");
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.start();
        connectionFactory = new ActiveMQConnectionFactory("vm://test");
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.threadPool != null) {
            for (ThreadedMessageReceiver receiver : this.receivers) {
                receiver.setShouldStop(true);
            }
            logger.info("Waiting for receivers to shutdown..");
            if (!this.threadPool.awaitTermination(10L, TimeUnit.SECONDS)) {
                logger.warn("Not all receivers completed shutdown.");
            } else {
                logger.info("All receivers shutdown successfully..");
            }
        }
        logger.debug("Stoping the broker.");
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendTextMessage(String queueName, int i) throws JMSException, NamingException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test");
        QueueConnection queueConnection = null;
        QueueSession session = null;
        QueueSender sender = null;
        Queue queue = null;
        TextMessage message = null;
        try {
            queueConnection = connectionFactory.createQueueConnection();
            session = queueConnection.createQueueSession(false, 1);
            queue = session.createQueue(TEST_QUEUE_NAME);
            sender = session.createSender(queue);
            sender.setDeliveryMode(2);
            message = session.createTextMessage(String.valueOf(i));
            sender.send((Message)message);
            if (session.getTransacted()) {
                session.commit();
            }
            if (i % 1000 == 0) {
                logger.info("Message successfully sent to : " + queue.getQueueName() + " messageid: " + message.getJMSMessageID() + " content:" + message.getText());
            }
        }
        finally {
            if (sender != null) {
                sender.close();
            }
            if (session != null) {
                session.close();
            }
            if (queueConnection != null) {
                queueConnection.close();
            }
        }
    }

    public void testForDuplicateMessages() throws Exception {
        final ConcurrentHashMap messages = new ConcurrentHashMap();
        final Object lock = new Object();
        final CountDownLatch duplicateSignal = new CountDownLatch(1);
        final AtomicInteger messageCount = new AtomicInteger(0);
        int i = 0;
        while ((long)i < 3000L) {
            if (duplicateSignal.getCount() == 0L) {
                AMQ1936Test.fail((String)"Duplicate message id detected");
            }
            this.sendTextMessage(TEST_QUEUE_NAME, i);
            ++i;
        }
        for (i = 0; i < 2; ++i) {
            this.receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onMessage(Message message) throws Exception {
                    Object object = lock;
                    synchronized (object) {
                        int current = messageCount.incrementAndGet();
                        if (current % 1000 == 0) {
                            logger.info("Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage)message).getText());
                        }
                        if (messages.containsKey(message.getJMSMessageID())) {
                            duplicateSignal.countDown();
                            logger.error("duplicate message id detected:" + message.getJMSMessageID());
                            TestCase.fail((String)("Duplicate message id detected:" + message.getJMSMessageID()));
                        } else {
                            messages.put(message.getJMSMessageID(), message.getJMSMessageID());
                        }
                    }
                }
            });
            this.threadPool.submit(this.receivers[i]);
        }
        i = 0;
        while ((long)i < 3000L) {
            if (duplicateSignal.getCount() == 0L) {
                AMQ1936Test.fail((String)"Duplicate message id detected");
            }
            this.sendTextMessage(TEST_QUEUE_NAME, i);
            ++i;
        }
        logger.info("sent all 6000 messages");
        boolean ok = Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 6000L == (long)messages.size();
            }
        }, (long)TimeUnit.MINUTES.toMillis(7L));
        if (!ok) {
            AutoFailTestSupport.dumpAllThreads((String)"--STUCK?--");
        }
        AMQ1936Test.assertEquals((String)"Number of messages received does not match the number sent", (long)6000L, (long)messages.size());
        AMQ1936Test.assertEquals((long)6000L, (long)messageCount.get());
    }

    public static interface IMessageHandler {
        public void onMessage(Message var1) throws Exception;
    }

    private static final class ThreadedMessageReceiver
    implements Runnable {
        private IMessageHandler handler = null;
        private final AtomicBoolean shouldStop = new AtomicBoolean(false);

        public ThreadedMessageReceiver(String queueName, IMessageHandler handler) {
            this.handler = handler;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            QueueConnection queueConnection = null;
            QueueSession session = null;
            QueueReceiver receiver = null;
            Queue queue = null;
            Message message = null;
            try {
                try {
                    queueConnection = connectionFactory.createQueueConnection();
                    session = queueConnection.createQueueSession(true, 1);
                    queue = session.createQueue(AMQ1936Test.TEST_QUEUE_NAME);
                    receiver = session.createReceiver(queue);
                    queueConnection.start();
                    logger.info("Receiver " + Thread.currentThread().getName() + " connected.");
                    while (!this.shouldStop.get() && !Thread.currentThread().isInterrupted()) {
                        block22: {
                            try {
                                message = receiver.receive(200L);
                            }
                            catch (Exception e) {
                                if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) break block22;
                                throw e;
                            }
                        }
                        if (message != null && this.handler != null) {
                            this.handler.onMessage(message);
                        }
                        if (!session.getTransacted()) continue;
                        session.commit();
                    }
                    logger.info("Receiver " + Thread.currentThread().getName() + " shutting down.");
                }
                finally {
                    if (receiver != null) {
                        try {
                            receiver.close();
                        }
                        catch (JMSException e) {
                            logger.warn(e.getMessage(), (Throwable)e);
                        }
                    }
                    if (session != null) {
                        try {
                            session.close();
                        }
                        catch (JMSException e) {
                            logger.warn(e.getMessage(), (Throwable)e);
                        }
                    }
                    if (queueConnection != null) {
                        queueConnection.close();
                    }
                }
            }
            catch (JMSException e) {
                logger.error(e.getMessage(), (Throwable)e);
                e.printStackTrace();
            }
            catch (NamingException e) {
                logger.error(e.getMessage(), (Throwable)e);
            }
            catch (Exception e) {
                logger.error(e.getMessage(), (Throwable)e);
                e.printStackTrace();
            }
        }

        public void setShouldStop(Boolean shouldStop) {
            this.shouldStop.set(shouldStop);
        }
    }
}

