/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.usecases.DurableSubscriptionOfflineTestBase;
import org.apache.activemq.usecases.DurableSubscriptionOfflineTestListener;
import org.apache.activemq.util.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubscriptionOfflineTest
extends DurableSubscriptionOfflineTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + this.getName(true));
        connectionFactory.setWatchTopicAdvisories(false);
        return connectionFactory;
    }

    @Test(timeout=60000L)
    public void testConsumeAllMatchedMessages() throws Exception {
        Connection con = this.createConnection();
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        Assert.assertEquals((long)sent, (long)listener.count);
    }

    @Test(timeout=60000L)
    public void testBrowseOfflineSub() throws Exception {
        Connection con = this.createConnection();
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId");
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        for (int i = 0; i < 10; ++i) {
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        session.close();
        con.close();
        ObjectName[] subs = this.broker.getAdminView().getInactiveDurableTopicSubscribers();
        Assert.assertEquals((long)1L, (long)subs.length);
        ObjectName subName = subs[0];
        DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)this.broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
        CompositeData[] data = sub.browse();
        Assert.assertNotNull((Object)data);
        Assert.assertEquals((long)10L, (long)data.length);
        TabularData tabularData = sub.browseAsTable();
        Assert.assertNotNull((Object)tabularData);
        Assert.assertEquals((long)10L, (long)tabularData.size());
    }

    @Test(timeout=60000L)
    public void testTwoOfflineSubscriptionCanConsume() throws Exception {
        Connection con = this.createConnection("cliId1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        Connection con2 = this.createConnection("cliId2");
        Session session2 = con2.createSession(false, 1);
        TopicSubscriber consumer2 = session2.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
        consumer2.setMessageListener((MessageListener)listener2);
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        session.close();
        con.close();
        Thread.sleep(3000L);
        session2.close();
        con2.close();
        Assert.assertEquals((long)sent, (long)listener2.count);
        con = this.createConnection("cliId1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        Assert.assertEquals((String)"offline consumer got all", (long)sent, (long)listener.count);
    }

    @Test(timeout=60000L)
    public void testRemovedDurableSubDeletes() throws Exception {
        String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
        Connection con = this.createConnection("cliId1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        for (int i = 0; i < 10; ++i) {
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        Connection con2 = this.createConnection("cliId1");
        Session session2 = con2.createSession(false, 1);
        session2.unsubscribe("SubsId");
        session2.close();
        con2.close();
        this.topic = new ActiveMQTopic(this.topic.getPhysicalName() + "?consumer.retroactive=true");
        con = this.createConnection("offCli2");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", filter, true);
        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
        consumer.setMessageListener((MessageListener)listener);
        session.close();
        con.close();
        Assert.assertEquals((long)0L, (long)listener.count);
    }

    @Test(timeout=60000L)
    public void testRemovedDurableSubDeletesFromIndex() throws Exception {
        if (!(this.broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter)) {
            return;
        }
        int numMessages = 2750;
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter();
        PageFile pageFile = kahaDBPersistenceAdapter.getStore().getPageFile();
        LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + ", fileSize:" + pageFile.getFile().length());
        long lastDiff = 0L;
        for (int repeats = 0; repeats < 2; ++repeats) {
            LOG.info("Iteration: " + repeats + " Count:" + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount());
            Connection con = this.createConnection("cliId1-" + repeats);
            Session session = con.createSession(false, 1);
            session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
            session.close();
            con.close();
            con = this.createConnection();
            session = con.createSession(false, 1);
            MessageProducer producer = session.createProducer(null);
            for (int i = 0; i < 2750; ++i) {
                Message message = session.createMessage();
                message.setStringProperty("filter", "true");
                producer.send((Destination)this.topic, message);
            }
            con.close();
            Connection con2 = this.createConnection("cliId1-" + repeats);
            Session session2 = con2.createSession(false, 1);
            session2.unsubscribe("SubsId");
            session2.close();
            con2.close();
            LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + " diff: " + (pageFile.getPageCount() - pageFile.getFreePageCount()) + " fileSize:" + pageFile.getFile().length());
            if (lastDiff != 0L) {
                Assert.assertEquals((String)("Only use X pages per iteration: " + repeats), (long)lastDiff, (long)(pageFile.getPageCount() - pageFile.getFreePageCount()));
            }
            lastDiff = pageFile.getPageCount() - pageFile.getFreePageCount();
        }
    }

    @Test(timeout=60000L)
    public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
        Connection con = this.createConnection("offCli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection("offCli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            boolean filter = (int)(Math.random() * 2.0) >= 1;
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        Connection con2 = this.createConnection("offCli1");
        Session session2 = con2.createSession(false, 1);
        session2.unsubscribe("SubsId");
        session2.close();
        con2.close();
        con = this.createConnection("offCli2");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("SubsId");
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        Assert.assertEquals((String)"offline consumer got all", (long)sent, (long)listener.count);
    }

    @Test(timeout=60000L)
    public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
        int messageCount = 1000;
        Connection con = null;
        Session session = null;
        int numConsumers = 10;
        for (int i = 0; i <= 10; ++i) {
            con = this.createConnection("cli" + i);
            session = con.createSession(false, 1);
            session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
            session.close();
            con.close();
        }
        String payLoad = new String(new byte[1000]);
        con = this.createConnection();
        final Session sendSession = con.createSession(true, 0);
        MessageProducer producer = sendSession.createProducer((Destination)this.topic);
        for (int i = 0; i < 1000; ++i) {
            producer.send((Message)sendSession.createTextMessage(payLoad));
        }
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    sendSession.commit();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                    DurableSubscriptionOfflineTest.this.exceptions.add(e);
                }
            }
        });
        for (int i = 0; i < 10; ++i) {
            class CheckForDupsClient
            implements Runnable {
                HashSet<Long> ids = new HashSet();
                final int id;

                public CheckForDupsClient(int id) {
                    this.id = id;
                }

                @Override
                public void run() {
                    try {
                        Connection con = DurableSubscriptionOfflineTest.this.createConnection("cli" + this.id);
                        Session session = con.createSession(false, 1);
                        for (int j = 0; j < 2; ++j) {
                            TopicSubscriber consumer = session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", null, true);
                            for (int i = 0; i < 500; ++i) {
                                Message message = consumer.receive(4000L);
                                Assert.assertNotNull((Object)message);
                                long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
                                Assert.assertTrue((String)("ID=" + this.id + " not a duplicate: " + producerSequenceId), (boolean)this.ids.add(producerSequenceId));
                            }
                            consumer.close();
                        }
                        TopicSubscriber consumer = session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", null, true);
                        Message message = consumer.receive(4000L);
                        if (message != null) {
                            long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
                            Assert.assertTrue((String)("ID=" + this.id + " not a duplicate: " + producerSequenceId), (boolean)this.ids.add(producerSequenceId));
                        }
                        Assert.assertNull((Object)message);
                        session.close();
                        con.close();
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                        DurableSubscriptionOfflineTest.this.exceptions.add(e);
                    }
                }
            }
            executorService.execute(new CheckForDupsClient(i));
        }
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.MINUTES);
        con.close();
        Assert.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    @Test(timeout=120000L)
    public void testOrderOnActivateDeactivate() throws Exception {
        for (int i = 0; i < 10; ++i) {
            LOG.info("Iteration: " + i);
            this.doTestOrderOnActivateDeactivate();
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.createBroker(true);
        }
    }

    public void doTestOrderOnActivateDeactivate() throws Exception {
        int messageCount = 1000;
        Connection con = null;
        Session session = null;
        int numConsumers = 4;
        for (int i = 0; i <= 4; ++i) {
            con = this.createConnection("cli" + i);
            session = con.createSession(false, 1);
            session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
            session.close();
            con.close();
        }
        String url = "failover:(tcp://localhost:" + ((TransportConnector)this.broker.getTransportConnectors().get(1)).getConnectUri().getPort() + "?wireFormat.maxInactivityDuration=0)?jms.watchTopicAdvisories=false&jms.alwaysSyncSend=true&jms.dispatchAsync=true&jms.sendAcksAsync=true&initialReconnectDelay=100&maxReconnectDelay=30000&useExponentialBackOff=true";
        final ActiveMQConnectionFactory clientFactory = new ActiveMQConnectionFactory(url);
        Runnable producer = new Runnable(){
            final String payLoad = new String(new byte[600]);

            @Override
            public void run() {
                try {
                    Connection con = DurableSubscriptionOfflineTest.this.createConnection();
                    Session sendSession = con.createSession(true, 0);
                    MessageProducer producer = sendSession.createProducer((Destination)DurableSubscriptionOfflineTest.this.topic);
                    for (int i = 0; i < 1000; ++i) {
                        producer.send((Message)sendSession.createTextMessage(this.payLoad));
                    }
                    LOG.info("About to commit: 1000");
                    sendSession.commit();
                    LOG.info("committed: 1000");
                    con.close();
                }
                catch (Exception e) {
                    e.printStackTrace();
                    DurableSubscriptionOfflineTest.this.exceptions.add(e);
                }
            }
        };
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 4; ++i) {
            class CheckOrderClient
            implements Runnable {
                final int id;
                int runCount = 0;

                public CheckOrderClient(int id) {
                    this.id = id;
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        CheckOrderClient checkOrderClient = this;
                        synchronized (checkOrderClient) {
                            Message message;
                            int i;
                            Connection con = clientFactory.createConnection();
                            con.setClientID("cli" + this.id);
                            con.start();
                            Session session = con.createSession(false, 2);
                            TopicSubscriber consumer = session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", null, true);
                            int nextId = 0;
                            ++this.runCount;
                            for (i = 0; i < 500 && (message = consumer.receiveNoWait()) != null; ++i) {
                                long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
                                Assert.assertEquals((String)(this.id + " expected order: runCount: " + this.runCount + " id: " + message.getJMSMessageID()), (long)(++nextId), (long)producerSequenceId);
                            }
                            LOG.info(con.getClientID() + " peeked " + i);
                            session.close();
                            con.close();
                        }
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                        DurableSubscriptionOfflineTest.this.exceptions.add(e);
                    }
                }
            }
            CheckOrderClient client = new CheckOrderClient(i);
            for (int j = 0; j < 100; ++j) {
                executorService.execute(client);
            }
        }
        executorService.execute(producer);
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.MINUTES);
        con.close();
        Assert.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    @Test(timeout=60000L)
    public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
        Connection con = this.createConnection("offCli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int filtered = 0;
        for (int i = 0; i < 10; ++i) {
            boolean filter;
            boolean bl = filter = i % 2 == 0;
            if (filter) {
                ++filtered;
            }
            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send((Destination)this.topic, message);
        }
        LOG.info("sent: " + filtered);
        Thread.sleep(1000L);
        session.close();
        con.close();
        con = this.createConnection("offCli1");
        session = con.createSession(false, 1);
        session.unsubscribe("SubsId");
        session.close();
        con.close();
        con = this.createConnection("offCli1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        Assert.assertEquals((long)0L, (long)listener.count);
    }

    @Test(timeout=60000L)
    public void testAllConsumed() throws Exception {
        String filter = "filter = 'true'";
        Connection con = this.createConnection("cli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
            ++sent;
        }
        LOG.info("sent: " + sent);
        Thread.sleep(1000L);
        session.close();
        con.close();
        con = this.createConnection("cli1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        Assert.assertEquals((long)sent, (long)listener.count);
        LOG.info("cli2 pull 2");
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Assert.assertNotNull((String)"got message", (Object)consumer.receive(2000L));
        Assert.assertNotNull((String)"got message", (Object)consumer.receive(2000L));
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        producer = session.createProducer(null);
        sent = 0;
        for (int i = 0; i < 2; ++i) {
            Message message = session.createMessage();
            message.setStringProperty("filter", i == 1 ? "true" : "false");
            producer.send((Destination)this.topic, message);
            ++sent;
        }
        LOG.info("sent: " + sent);
        Thread.sleep(1000L);
        session.close();
        con.close();
        LOG.info("cli1 again, should get 1 new ones");
        con = this.createConnection("cli1");
        session = con.createSession(false, 1);
        consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        listener = new DurableSubscriptionOfflineTestListener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        Assert.assertEquals((long)1L, (long)listener.count);
    }

    @Test(timeout=60000L)
    public void testNoMissOnMatchingSubAfterRestart() throws Exception {
        int i;
        String filter = "filter = 'true'";
        Connection con = this.createConnection("cli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        Message message = session.createMessage();
        message.setStringProperty("filter", "true");
        message.setIntProperty("ID", 0);
        producer.send((Destination)this.topic, message);
        for (i = ++sent; i < 10; ++i) {
            message = session.createMessage();
            message.setStringProperty("filter", "false");
            message.setIntProperty("ID", i);
            producer.send((Destination)this.topic, message);
            ++sent;
        }
        con.close();
        LOG.info("sent: " + sent);
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        this.destroyBroker();
        this.createBroker(false);
        con = this.createConnection();
        session = con.createSession(false, 1);
        producer = session.createProducer(null);
        for (i = sent; i < 30; ++i) {
            message = session.createMessage();
            message.setStringProperty("filter", "true");
            message.setIntProperty("ID", i);
            producer.send((Destination)this.topic, message);
            ++sent;
        }
        con.close();
        LOG.info("sent: " + sent);
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Message m = consumer.receive(3000L);
        Assert.assertEquals((String)"is message 10", (long)10L, (long)m.getIntProperty("ID"));
        session.close();
        con.close();
        con = this.createConnection("cli1");
        session = con.createSession(false, 1);
        consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        m = consumer.receive(3000L);
        Assert.assertEquals((String)"is message 0", (long)0L, (long)m.getIntProperty("ID"));
        m = consumer.receive(3000L);
        Assert.assertEquals((String)"is message 10", (long)10L, (long)m.getIntProperty("ID"));
        session.close();
        con.close();
    }

    @Test(timeout=640000L)
    public void testInactiveSubscribeAfterBrokerRestart() throws Exception {
        int messageCount = 20;
        Connection alwaysOnCon = this.createConnection("subs1");
        Connection tearDownFacCon = this.createConnection("subs2");
        Session awaysOnCon = alwaysOnCon.createSession(false, 1);
        Session tearDownCon = tearDownFacCon.createSession(false, 1);
        ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
        String consumerName = "consumerName";
        String tearDownconsumerName = "tearDownconsumerName";
        TopicSubscriber remoteConsumer = awaysOnCon.createDurableSubscriber((Topic)topic, consumerName);
        TopicSubscriber remoteConsumer2 = tearDownCon.createDurableSubscriber((Topic)topic, tearDownconsumerName);
        final DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("listener");
        remoteConsumer.setMessageListener((MessageListener)listener);
        remoteConsumer2.setMessageListener((MessageListener)listener);
        MessageProducer localProducer = awaysOnCon.createProducer((Destination)topic);
        localProducer.setDeliveryMode(2);
        for (int i = 0; i < 20; ++i) {
            if (i == 10) {
                remoteConsumer2.close();
                tearDownFacCon.close();
            }
            TextMessage test = awaysOnCon.createTextMessage("test-" + i);
            localProducer.send((Message)test);
        }
        this.destroyBroker();
        this.createBroker(false);
        Connection reconnectCon = this.createConnection("subs2");
        Session reconnectSession = reconnectCon.createSession(false, 1);
        remoteConsumer2 = reconnectSession.createDurableSubscriber((Topic)topic, tearDownconsumerName);
        remoteConsumer2.setMessageListener((MessageListener)listener);
        LOG.info("waiting for messages to flow");
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return listener.count >= 40;
            }
        });
        Assert.assertTrue((String)("At least message 40 must be received, count=" + listener.count), (40 <= listener.count ? 1 : 0) != 0);
        awaysOnCon.close();
        reconnectCon.close();
    }
}

