/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubscriberNonPersistentMessageTest
extends TestCase {
    private final Logger LOG = LoggerFactory.getLogger(DurableSubscriberNonPersistentMessageTest.class);
    private String brokerURL;
    private String consumerBrokerURL;
    int initialMaxMsgs = 10;
    int cleanupMsgCount = 10;
    int totalMsgCount = this.initialMaxMsgs + this.cleanupMsgCount;
    int totalMsgReceived = 0;
    int sleep = 500;
    int reconnectSleep = 2000;
    int messageTimeout = 1000;
    int messageSize = 1024;
    long ttl = 0L;
    static String clientId = "Jason";
    MBeanServer mbeanServer;
    BrokerService broker;

    protected void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        TransportConnector transportConnector = this.broker.addConnector("tcp://localhost:0");
        KahaDBStore store = new KahaDBStore();
        store.setDirectory(new File("data"));
        this.broker.setPersistenceAdapter((PersistenceAdapter)store);
        this.broker.start();
        this.brokerURL = "failover:(" + transportConnector.getPublishableConnectString() + ")";
        this.consumerBrokerURL = this.brokerURL + "?jms.prefetchPolicy.all=100";
        this.mbeanServer = ManagementFactory.getPlatformMBeanServer();
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
        super.tearDown();
    }

    public DurableSubscriberNonPersistentMessageTest(String testName) {
        super(testName);
    }

    public static Test suite() {
        return new TestSuite(DurableSubscriberNonPersistentMessageTest.class);
    }

    public void testDurableSubscriberNonPersistentMessage() {
        String interest = "TEST";
        this.LOG.info("Starting DurableSubscriberNonPersistentMessageTest");
        try {
            this.createConsumer(interest, 0);
            Thread.sleep(1000L);
            Producer producer = new Producer(this.brokerURL, interest, this.messageSize, this.ttl);
            producer.sendMessages(this.totalMsgCount);
            producer.close();
            this.LOG.info(this.totalMsgCount + " messages sent");
            this.createConsumer(interest, this.initialMaxMsgs);
            Thread.sleep(this.reconnectSleep);
            this.createConsumer(interest, this.cleanupMsgCount);
            String brokerVersion = (String)this.mbeanServer.getAttribute(new ObjectName("org.apache.activemq:brokerName=localhost,type=Broker"), "BrokerVersion");
            this.LOG.info("Test run on: " + brokerVersion);
            String theJmxObject = "org.apache.activemq:type=Broker,brokerName=localhost,endpoint=Consumer,destinationType=Topic,destinationName=TEST,clientId=Jason,consumerId=Durable(Jason_MyDurableTopic)";
            DurableSubscriberNonPersistentMessageTest.assertTrue((String)"pendingQueueSize should be zero", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    Integer pendingQueueSize = (Integer)DurableSubscriberNonPersistentMessageTest.this.mbeanServer.getAttribute(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,endpoint=Consumer,destinationType=Topic,destinationName=TEST,clientId=Jason,consumerId=Durable(Jason_MyDurableTopic)"), "PendingQueueSize");
                    DurableSubscriberNonPersistentMessageTest.this.LOG.info("pendingQueueSize = " + pendingQueueSize);
                    return pendingQueueSize == 0;
                }
            }));
            DurableSubscriberNonPersistentMessageTest.assertTrue((String)"cursorMemoryUsage should be zero", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    Long cursorMemoryUsage = (Long)DurableSubscriberNonPersistentMessageTest.this.mbeanServer.getAttribute(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,endpoint=Consumer,destinationType=Topic,destinationName=TEST,clientId=Jason,consumerId=Durable(Jason_MyDurableTopic)"), "CursorMemoryUsage");
                    DurableSubscriberNonPersistentMessageTest.this.LOG.info("cursorMemoryUsage = " + cursorMemoryUsage);
                    return cursorMemoryUsage == 0L;
                }
            }));
            DurableSubscriberNonPersistentMessageTest.assertTrue((this.totalMsgReceived == this.initialMaxMsgs + this.cleanupMsgCount ? 1 : 0) != 0);
        }
        catch (Exception e) {
            this.LOG.error("Exception Executing DurableSubscriberNonPersistentMessageTest: " + this.getStackTrace(e));
            DurableSubscriberNonPersistentMessageTest.fail((String)"Should not throw any exceptions");
        }
    }

    public void createConsumer(String interest, int maxMsgs) {
        int messageReceived = 0;
        int messagesNotReceived = 0;
        this.LOG.info("Starting DurableSubscriber");
        Consumer consumer = null;
        try {
            consumer = new Consumer(this.consumerBrokerURL, interest, clientId);
            for (int i = 0; i < maxMsgs; ++i) {
                try {
                    Message msg = consumer.getMessage(this.messageTimeout);
                    if (msg != null) {
                        this.LOG.debug("Received Message: " + msg.toString());
                        ++messageReceived;
                        ++this.totalMsgReceived;
                    } else {
                        this.LOG.debug("message " + i + " not received");
                        ++messagesNotReceived;
                    }
                    Thread.sleep(this.sleep);
                    continue;
                }
                catch (InterruptedException ie) {
                    this.LOG.debug("Exception: " + ie);
                }
            }
            consumer.close();
            this.LOG.info("Consumer Finished");
            this.LOG.info("Received " + messageReceived);
            this.LOG.info("Not Received " + messagesNotReceived);
        }
        catch (JMSException e) {
            this.LOG.error("Exception Executing SimpleConsumer: " + this.getStackTrace(e));
        }
    }

    public String getStackTrace(Throwable aThrowable) {
        StringWriter result = new StringWriter();
        PrintWriter printWriter = new PrintWriter(result);
        aThrowable.printStackTrace(printWriter);
        return ((Object)result).toString();
    }

    public class Consumer {
        private final ConnectionFactory factory;
        private final ActiveMQConnection connection;
        private final Session session;
        private final MessageConsumer messageConsumer;

        public Consumer(String brokerURL, String interest, String clientId) throws JMSException {
            this.factory = new ActiveMQConnectionFactory(brokerURL);
            this.connection = (ActiveMQConnection)this.factory.createConnection();
            this.connection.setClientID(clientId);
            this.connection.start();
            this.connection.getPrefetchPolicy().setAll(15);
            this.session = this.connection.createSession(false, 1);
            Topic destination = this.session.createTopic(interest);
            this.messageConsumer = this.session.createDurableSubscriber(destination, "MyDurableTopic");
        }

        public void deleteAllMessages() throws JMSException {
            while (this.getMessage(500) != null) {
            }
        }

        public Message getMessage(int timeout) throws JMSException {
            return this.messageConsumer.receive((long)timeout);
        }

        public void close() throws JMSException {
            if (this.messageConsumer != null) {
                this.messageConsumer.close();
            }
            if (this.session != null) {
                this.session.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        }

        public Session getSession() {
            return this.session;
        }
    }

    public class Producer {
        protected ConnectionFactory factory;
        protected transient Connection connection;
        protected transient Session session;
        protected transient MessageProducer producer;
        protected static final int messageSize = 1024;

        public Producer(String brokerURL, String interest, int messageSize, long ttl) throws JMSException {
            this.factory = new ActiveMQConnectionFactory(brokerURL);
            this.connection = this.factory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(false, 1);
            this.producer = this.session.createProducer((Destination)this.session.createTopic(interest));
            this.producer.setDeliveryMode(1);
            if (ttl > 0L) {
                this.producer.setTimeToLive(ttl);
            }
        }

        public void close() throws JMSException {
            if (this.connection != null) {
                this.connection.close();
            }
        }

        protected void sendMessage() throws JMSException {
            TextMessage textMessage = this.session.createTextMessage("test message");
            this.producer.send((Message)textMessage);
        }

        protected void sendMessages(int count) throws JMSException {
            for (int i = 0; i < count; ++i) {
                TextMessage textMessage = this.session.createTextMessage(this.createMessageText(i));
                this.producer.send((Message)textMessage);
            }
        }

        private String createMessageText(int index) {
            StringBuffer buffer = new StringBuffer(1024);
            buffer.append("Message: " + index + " sent at: " + new Date());
            if (buffer.length() > 1024) {
                return buffer.substring(0, 1024);
            }
            for (int i = buffer.length(); i < 1024; ++i) {
                buffer.append(' ');
            }
            return buffer.toString();
        }

        protected void commitTransaction() throws JMSException {
            this.session.commit();
        }
    }
}

