/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueResendDuringShutdownTest {
    private static final Logger LOG = LoggerFactory.getLogger(QueueResendDuringShutdownTest.class);
    public static final int NUM_CONNECTION_TO_TEST = 8;
    private static boolean iterationFoundFailure = false;
    private BrokerService broker;
    private ActiveMQConnectionFactory factory;
    private Connection[] connections;
    private Connection producerConnection;
    private Queue queue;
    private Object messageReceiveSync = new Object();
    private int receiveCount;

    @Before
    public void setUp() throws Exception {
        this.receiveCount = 0;
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.factory = new ActiveMQConnectionFactory(this.broker.getVmConnectorURI());
        this.queue = new ActiveMQQueue("TESTQUEUE");
        this.connections = new Connection[8];
        for (int iter = 0; iter < 8; ++iter) {
            this.connections[iter] = this.factory.createConnection();
        }
        this.producerConnection = this.factory.createConnection();
        this.producerConnection.start();
    }

    @After
    public void cleanup() throws Exception {
        for (Connection oneConnection : this.connections) {
            if (oneConnection == null) continue;
            this.closeConnection(oneConnection);
        }
        this.connections = null;
        if (this.producerConnection != null) {
            this.closeConnection(this.producerConnection);
            this.producerConnection = null;
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test(timeout=3000L)
    public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter1() throws Throwable {
        this.runTestIteration();
    }

    @Test(timeout=3000L)
    public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter2() throws Throwable {
        this.runTestIteration();
    }

    @Test(timeout=3000L)
    public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter3() throws Throwable {
        this.runTestIteration();
    }

    protected void runTestIteration() throws Throwable {
        if (iterationFoundFailure) {
            LOG.info("skipping test iteration; failure previously detected");
            return;
        }
        try {
            this.testRedeliverAtBrokerShutdownAutoAckMsgListener();
        }
        catch (Throwable thrown) {
            iterationFoundFailure = true;
            throw thrown;
        }
    }

    protected void testRedeliverAtBrokerShutdownAutoAckMsgListener() throws Exception {
        for (Connection oneConnection : this.connections) {
            MessageConsumer consumer = this.startupConsumer(oneConnection, false, 1);
            this.configureMessageListener(consumer);
            oneConnection.start();
        }
        this.sendMessage();
        this.waitForMessage(1000L);
        Assert.assertEquals((long)1L, (long)this.receiveCount);
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.delay(100L, "give queue time flush");
        Assert.assertEquals((long)1L, (long)this.receiveCount);
    }

    protected MessageConsumer startupConsumer(Connection conn, boolean transInd, int ackMode) throws JMSException {
        Session sess = conn.createSession(transInd, ackMode);
        MessageConsumer consumer = sess.createConsumer((Destination)this.queue);
        return consumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void messageReceived() {
        QueueResendDuringShutdownTest queueResendDuringShutdownTest = this;
        synchronized (queueResendDuringShutdownTest) {
            ++this.receiveCount;
            Object object = this.messageReceiveSync;
            synchronized (object) {
                this.messageReceiveSync.notifyAll();
            }
        }
    }

    protected void configureMessageListener(MessageConsumer consumer) throws JMSException {
        final MessageConsumer fConsumer = consumer;
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message msg) {
                LOG.debug("got a message on consumer {}", (Object)fConsumer);
                QueueResendDuringShutdownTest.this.messageReceived();
                QueueResendDuringShutdownTest.this.delay(3000L, "pause so connection shutdown leads to unacked message redelivery");
            }
        });
    }

    protected void sendMessage() throws JMSException {
        Session sess = this.producerConnection.createSession(false, 1);
        MessageProducer prod = sess.createProducer((Destination)this.queue);
        prod.send((Message)sess.createTextMessage("X-TEST-MSG-X"));
        prod.close();
        sess.close();
    }

    protected void closeConnection(Connection conn) {
        try {
            conn.close();
        }
        catch (JMSException jmsExc) {
            LOG.info("failed to cleanup connection", (Throwable)jmsExc);
        }
    }

    protected void delay(long delayMs, String desc) {
        try {
            Thread.sleep(delayMs);
        }
        catch (InterruptedException intExc) {
            LOG.warn("sleep interrupted: " + desc, (Throwable)intExc);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForMessage(long delayMs) {
        try {
            Object object = this.messageReceiveSync;
            synchronized (object) {
                if (this.receiveCount == 0) {
                    this.messageReceiveSync.wait(delayMs);
                }
            }
        }
        catch (InterruptedException intExc) {
            LOG.warn("sleep interrupted: wait for message to arrive");
        }
    }
}

