/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PriorityRedeliveryOrderTest {
    private static final Logger LOG = LoggerFactory.getLogger(PriorityRedeliveryOrderTest.class);
    private static final String DESTINATION = "testQ1";
    private static final int MESSAGES_TO_SEND = 1000;
    private static final int MESSAGES_PER_CONSUMER = 200;
    private int consumedAppId = -1;
    private int totalConsumed;
    BrokerService broker;

    @Before
    public void createBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        ArrayList<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry pe = new PolicyEntry();
        pe.setPrioritizedMessages(true);
        pe.setQueue(">");
        entries.add(pe);
        policyMap.setPolicyEntries(entries);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test
    public void testMessageDeliveryOrderAfterPrefetch() throws Exception {
        this.sendMessages(1000);
        for (int i = 0; i < 5; ++i) {
            this.totalConsumed += this.consumeMessages(200);
        }
        Assert.assertEquals((String)"number of messages consumed should be equal to number of messages sent", (long)1000L, (long)this.totalConsumed);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Long sendMessages(int messageCount) throws Exception {
        long numberOfMessageSent = 0L;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        connection.start();
        try (Connection connection = connectionFactory.createConnection();){
            Session producerSession = connection.createSession(true, 0);
            MessageProducer jmsProducer = producerSession.createProducer((Destination)producerSession.createQueue(DESTINATION));
            TextMessage sendMessage = producerSession.createTextMessage("test_message");
            for (int i = 0; i < messageCount; ++i) {
                sendMessage.setIntProperty("appID", i);
                jmsProducer.send((Message)sendMessage);
                producerSession.commit();
                ++numberOfMessageSent;
            }
            LOG.info(" Finished after producing : " + numberOfMessageSent);
            Long l = numberOfMessageSent;
            return l;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int consumeMessages(int numberOfMessage) throws Exception {
        LOG.info("Creating new consumer for:" + numberOfMessage);
        int numberConsumedMessage = 0;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
        try {
            connection.start();
            Session session = connection.createSession(true, 0);
            MessageConsumer jmsConsumer = session.createConsumer((Destination)session.createQueue(DESTINATION));
            boolean consume = true;
            while (true) {
                if (!consume) break;
                Message message = jmsConsumer.receive(4000L);
                if (message == null) {
                    LOG.info("Break on:" + numberConsumedMessage);
                    break;
                }
                int newAppId = message.getIntProperty("appID");
                ++numberConsumedMessage;
                LOG.debug("Message newAppID" + newAppId);
                if (newAppId != this.consumedAppId + 1) {
                    Assert.fail((String)(" newAppId is " + newAppId + " expected " + (this.consumedAppId + 1)));
                }
                this.consumedAppId = newAppId;
                session.commit();
                if (numberConsumedMessage != numberOfMessage) continue;
                LOG.info("closing consumer after 200 message, consumedAppID is " + this.consumedAppId);
                int n = numberConsumedMessage;
                return n;
            }
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                }
                catch (Exception exception) {}
            }
        }
        return numberConsumedMessage;
    }
}

