/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueOrderSingleTransactedConsumerTest {
    private static final Logger LOG = LoggerFactory.getLogger(QueueOrderSingleTransactedConsumerTest.class);
    BrokerService broker = null;
    ActiveMQQueue dest = new ActiveMQQueue("Queue");

    @Test
    public void testSingleConsumerTxRepeat() throws Exception {
        ActiveMQQueue dummyDest = new ActiveMQQueue("AnotherQueue");
        this.publishMessagesWithOrderProperty(10, 0, this.dest);
        this.publishMessagesWithOrderProperty(1, 0, dummyDest);
        this.publishMessagesWithOrderProperty(10, 10, this.dest);
        this.publishMessagesWithOrderProperty(1, 0, dummyDest);
        this.publishMessagesWithOrderProperty(10, 20, this.dest);
        this.publishMessagesWithOrderProperty(1, 0, dummyDest);
        this.publishMessagesWithOrderProperty(5, 30, this.dest);
        this.consumeVerifyOrderRollback(20);
        this.consumeVerifyOrderRollback(10);
        this.consumeVerifyOrderRollback(5);
    }

    @Test
    public void testSingleSessionXConsumerTxRepeat() throws Exception {
        this.publishMessagesWithOrderProperty(50);
        Connection connection = this.getConnectionFactory().createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageConsumer messageConsumer = this.consumeVerifyOrder(session, 20);
        messageConsumer.close();
        session.rollback();
        messageConsumer = this.consumeVerifyOrder(session, 10);
        messageConsumer.close();
        session.rollback();
        messageConsumer = this.consumeVerifyOrder(session, 5);
        messageConsumer.close();
        session.commit();
        connection.close();
    }

    @Test
    public void tesXConsumerTxRepeat() throws Exception {
        this.publishMessagesWithOrderProperty(10);
        Connection connection = this.getConnectionFactory().createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageConsumer messageConsumer = this.consumeVerifyOrder(session, 6);
        messageConsumer.close();
        messageConsumer = this.consumeVerifyOrder(session, 4, 6);
        session.rollback();
        messageConsumer.close();
        messageConsumer = this.consumeVerifyOrder(session, 10);
        session.commit();
        messageConsumer.close();
        connection.close();
    }

    @Test
    public void testSingleTxXConsumerTxRepeat() throws Exception {
        this.publishMessagesWithOrderProperty(10);
        Connection connection = this.getConnectionFactory().createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageConsumer messageConsumer = this.consumeVerifyOrder(session, 6);
        messageConsumer.close();
        messageConsumer = this.consumeVerifyOrder(session, 4, 6);
        messageConsumer.close();
        session.rollback();
        messageConsumer = this.consumeVerifyOrder(session, 10);
        session.commit();
        messageConsumer.close();
        connection.close();
    }

    private void consumeVerifyOrderRollback(int num) throws Exception {
        Connection connection = this.getConnectionFactory().createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageConsumer messageConsumer = this.consumeVerifyOrder(session, num);
        messageConsumer.close();
        session.rollback();
        connection.close();
    }

    private MessageConsumer consumeVerifyOrder(Session session, int num) throws Exception {
        return this.consumeVerifyOrder(session, num, 0);
    }

    private MessageConsumer consumeVerifyOrder(Session session, int num, int base) throws Exception {
        MessageConsumer messageConsumer = session.createConsumer((Destination)this.dest);
        int i = 0;
        while (i < num) {
            Message message = messageConsumer.receive(4000L);
            if (message == null) continue;
            Assert.assertEquals((long)(i + base), (long)message.getIntProperty("Order"));
            ++i;
            LOG.debug("Received:" + message.getJMSMessageID() + ", Order: " + message.getIntProperty("Order"));
        }
        return messageConsumer;
    }

    private void publishMessagesWithOrderProperty(int num) throws Exception {
        this.publishMessagesWithOrderProperty(num, 0, this.dest);
    }

    private void publishMessagesWithOrderProperty(int num, int seqStart, ActiveMQQueue destination) throws Exception {
        Connection connection = this.getConnectionFactory().createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageProducer messageProducer = session.createProducer((Destination)destination);
        TextMessage textMessage = session.createTextMessage("A");
        for (int i = 0; i < num; ++i) {
            textMessage.setIntProperty("Order", i + seqStart);
            messageProducer.send((Message)textMessage);
        }
    }

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        ArrayList<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry pe = new PolicyEntry();
        pe.setExpireMessagesPeriod(0L);
        pe.setQueuePrefetch(0);
        pe.setStrictOrderDispatch(true);
        pe.setQueue(">");
        entries.add(pe);
        policyMap.setPolicyEntries(entries);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    private ActiveMQConnectionFactory getConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
    }
}

