/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.io.File;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubSelectorDelayWithRestartTest {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubSelectorDelayWithRestartTest.class);
    public static final long RUNTIME = 60000L;
    private boolean RESTART = true;
    private int NUMBER_SUBSCRIBERS = 3;
    private BrokerService broker;
    private ActiveMQTopic topic;

    @Test
    public void testProcess() throws Exception {
        MsgProducer msgProducer = new MsgProducer();
        msgProducer.start();
        DurableSubscriber[] subscribers = new DurableSubscriber[this.NUMBER_SUBSCRIBERS];
        for (int i = 0; i < subscribers.length - 1; ++i) {
            subscribers[i] = new DurableSubscriber(i);
            subscribers[i].process();
        }
        msgProducer.join();
        subscribers[subscribers.length - 1] = new DurableSubscriber(subscribers.length - 1);
        subscribers[subscribers.length - 1].subscribe();
        MsgProducer msgProducer2 = new MsgProducer();
        msgProducer2.send();
        subscribers[subscribers.length - 1].process();
        for (int j = 0; j < subscribers.length - 1; ++j) {
            LOG.info("Unsubscribing subscriber " + subscribers[j]);
            subscribers[j].unsubscribe();
        }
        final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter();
        Assert.assertTrue((String)"small number of journal files should be left ", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("journal data file count - expected {} actual {}", (Object)4, (Object)pa.getStore().getJournal().getFileMap().size());
                return pa.getStore().getJournal().getFileMap().size() <= 4;
            }
        }, (long)TimeUnit.MINUTES.toMillis(3L)));
        LOG.info("DONE.");
    }

    @Before
    public void setUp() throws Exception {
        this.topic = new ActiveMQTopic("TopicT");
        this.startBroker();
    }

    @After
    public void tearDown() throws Exception {
        this.destroyBroker();
    }

    private void startBroker() throws Exception {
        this.startBroker(true);
    }

    private void startBroker(boolean deleteAllMessages) throws Exception {
        if (this.broker != null) {
            return;
        }
        this.broker = BrokerFactory.createBroker((String)("broker:(vm://" + DurableSubSelectorDelayWithRestartTest.getName() + ")"));
        this.broker.setBrokerName(DurableSubSelectorDelayWithRestartTest.getName());
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        File kahadbData = new File("activemq-data/" + DurableSubSelectorDelayWithRestartTest.getName() + "-kahadb");
        if (deleteAllMessages) {
            DurableSubSelectorDelayWithRestartTest.delete(kahadbData);
        }
        this.broker.setPersistent(true);
        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
        kahadb.setDirectory(kahadbData);
        kahadb.setJournalMaxFileLength(10240);
        kahadb.setCleanupInterval(5000L);
        this.broker.setPersistenceAdapter((PersistenceAdapter)kahadb);
        this.broker.addConnector("tcp://localhost:61656");
        this.broker.getSystemUsage().getMemoryUsage().setLimit(0x10000000L);
        this.broker.getSystemUsage().getTempUsage().setLimit(0x10000000L);
        this.broker.getSystemUsage().getStoreUsage().setLimit(0x10000000L);
        LOG.info(this.toString() + "Starting Broker...");
        this.broker.start();
        this.broker.waitUntilStarted();
        LOG.info(this.toString() + " Broker started!!");
    }

    protected static String getName() {
        return "DurableSubSelectorDelayTest";
    }

    private static boolean delete(File path) {
        if (path == null) {
            return true;
        }
        if (path.isDirectory()) {
            for (File file : path.listFiles()) {
                DurableSubSelectorDelayWithRestartTest.delete(file);
            }
        }
        return path.delete();
    }

    private void destroyBroker() throws Exception {
        if (this.broker == null) {
            return;
        }
        this.broker.stop();
        this.broker = null;
    }

    private final class DurableSubscriber {
        final String url = "failover:(tcp://localhost:61656)";
        final ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61656)");
        private final String subName;
        private final int id;
        private final String conClientId;
        private final String selector;

        public DurableSubscriber(int id) throws JMSException {
            this.id = id;
            this.conClientId = "cli" + id;
            this.subName = "subscription" + id;
            this.selector = "RELEVANT = true";
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void process() throws JMSException {
            long end = System.currentTimeMillis() + 20000L;
            int transCount = 0;
            LOG.info(this.toString() + " ONLINE.");
            Connection con = this.openConnection();
            Session sess = con.createSession(false, 1);
            TopicSubscriber consumer = sess.createDurableSubscriber((Topic)DurableSubSelectorDelayWithRestartTest.this.topic, this.subName, this.selector, false);
            try {
                long max;
                while ((max = end - System.currentTimeMillis()) > 0L) {
                    Message message = consumer.receive(max);
                    if (message == null) continue;
                    LOG.info("Received Trans[id=" + message.getIntProperty("TRANS") + ", count=" + transCount + "] in " + this + ".");
                }
            }
            finally {
                try {
                    sess.close();
                    con.close();
                }
                catch (Exception exception) {}
                LOG.info(this.toString() + " OFFLINE.");
            }
        }

        private Connection openConnection() throws JMSException {
            Connection con = this.cf.createConnection();
            con.setClientID(this.conClientId);
            con.start();
            return con;
        }

        public void subscribe() throws JMSException {
            LOG.info(this.toString() + "SUBSCRIBING");
            Connection con = this.openConnection();
            Session sess = con.createSession(false, 1);
            sess.createDurableSubscriber((Topic)DurableSubSelectorDelayWithRestartTest.this.topic, this.subName, this.selector, false);
            sess.close();
            con.close();
        }

        private void unsubscribe() throws JMSException {
            Connection con = this.openConnection();
            Session session = con.createSession(false, 1);
            session.unsubscribe(this.subName);
            session.close();
            con.close();
        }

        public String toString() {
            return "DurableSubscriber[id=" + this.id + "]";
        }
    }

    final class MsgProducer
    extends Thread {
        final String url = "failover:(tcp://localhost:61656)";
        final ConnectionFactory cf;
        int transRover;
        int messageRover;

        public MsgProducer() {
            super("MsgProducer");
            this.url = "failover:(tcp://localhost:61656)";
            this.cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61656)");
            this.transRover = 0;
            this.messageRover = 0;
            this.setDaemon(true);
        }

        @Override
        public void run() {
            long endTime = 60000L + System.currentTimeMillis();
            try {
                while (endTime > System.currentTimeMillis()) {
                    Thread.sleep(400L);
                    this.send();
                    if (!DurableSubSelectorDelayWithRestartTest.this.RESTART) continue;
                    DurableSubSelectorDelayWithRestartTest.this.destroyBroker();
                    DurableSubSelectorDelayWithRestartTest.this.startBroker(false);
                }
            }
            catch (Throwable e) {
                e.printStackTrace(System.out);
                throw new RuntimeException(e);
            }
        }

        public void send() throws JMSException {
            int trans = ++this.transRover;
            boolean relevantTrans = true;
            int count = 40;
            LOG.info("Sending Trans[id=" + trans + ", count=" + count + "]");
            Connection con = this.cf.createConnection();
            Session sess = con.createSession(false, 1);
            MessageProducer prod = sess.createProducer(null);
            for (int i = 0; i < count; ++i) {
                Message message = sess.createMessage();
                message.setIntProperty("ID", ++this.messageRover);
                message.setIntProperty("TRANS", trans);
                message.setBooleanProperty("RELEVANT", false);
                prod.send((Destination)DurableSubSelectorDelayWithRestartTest.this.topic, message);
            }
            Message message = sess.createMessage();
            message.setIntProperty("ID", ++this.messageRover);
            message.setIntProperty("TRANS", trans);
            message.setBooleanProperty("COMMIT", true);
            message.setBooleanProperty("RELEVANT", relevantTrans);
            prod.send((Destination)DurableSubSelectorDelayWithRestartTest.this.topic, message);
            LOG.info("Committed Trans[id=" + trans + ", count=" + count + "], ID=" + this.messageRover);
            sess.close();
            con.close();
        }
    }
}

