/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OfflineDurableSubscriberTimeoutTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(OfflineDurableSubscriberTimeoutTest.class);
    private BrokerService broker;

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + this.getName(true));
        connectionFactory.setWatchTopicAdvisories(false);
        return connectionFactory;
    }

    @Override
    protected Connection createConnection() throws Exception {
        return this.createConnection("id");
    }

    protected Connection createConnection(String name) throws Exception {
        Connection con = this.getConnectionFactory().createConnection();
        con.setClientID(name);
        con.start();
        return con;
    }

    public static Test suite() {
        return OfflineDurableSubscriberTimeoutTest.suite(OfflineDurableSubscriberTimeoutTest.class);
    }

    protected void setUp() throws Exception {
        this.createBroker();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        this.destroyBroker();
    }

    private void createBroker() throws Exception {
        this.createBroker(true);
    }

    private void createBroker(boolean deleteAllMessages) throws Exception {
        this.broker = BrokerFactory.createBroker((String)("broker:(vm://" + this.getName(true) + ")"));
        this.broker.setBrokerName(this.getName(true));
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.broker.getManagementContext().setCreateConnector(false);
        this.broker.setAdvisorySupport(false);
        this.setDefaultPersistenceAdapter(this.broker);
        ((KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).getStore().getPageFile().setPageSize(1024);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(1000L);
        policyEntry.setProducerFlowControl(true);
        policyMap.put((ActiveMQDestination)new ActiveMQTopic(">"), (Object)policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setOfflineDurableSubscriberTaskSchedule(1000L);
        this.broker.setOfflineDurableSubscriberTimeout(2004L);
        this.broker.setDestinations(new ActiveMQDestination[]{new ActiveMQTopic("topic1")});
        this.broker.start();
    }

    private void destroyBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testOfflineDurableSubscriberTimeout() throws Exception {
        final AtomicBoolean foundLogMessage = new AtomicBoolean(false);
        org.apache.logging.log4j.core.Logger loggerMRB = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(ManagedRegionBroker.class));
        org.apache.logging.log4j.core.Logger loggerTopic = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(Topic.class));
        AbstractAppender appender = new AbstractAppender("testAppender", (Filter)new AbstractFilter(){}, (Layout)new MessageLayout(), false, new Property[0]){

            public void append(LogEvent event) {
                if (event.getLevel().isMoreSpecificThan(Level.WARN) && !event.getMessage().getFormattedMessage().contains("Store limit") && !event.getMessage().getFormattedMessage().contains("resetting to 70% of maximum available")) {
                    LOG.info("** received unexpected log message: " + event.getMessage().getFormattedMessage() + " [" + event.getLoggerName() + "] (" + event.getLevel().toString() + ")");
                    foundLogMessage.set(true);
                }
            }
        };
        appender.start();
        loggerMRB.get().addAppender((Appender)appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        loggerMRB.addAppender((Appender)appender);
        loggerTopic.get().addAppender((Appender)appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        loggerTopic.addAppender((Appender)appender);
        try {
            this.createOfflineDurableSubscribers("topic_new");
            OfflineDurableSubscriberTimeoutTest.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    LOG.info("broker.getAdminView().getInactiveDurableTopicSubscribers():" + OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
                }
            }));
            OfflineDurableSubscriberTimeoutTest.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
                }
            }));
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.createBroker(false);
            this.broker.waitUntilStarted();
            this.createOfflineDurableSubscribers("topic_new");
            OfflineDurableSubscriberTimeoutTest.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
                }
            }));
            OfflineDurableSubscriberTimeoutTest.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
                }
            }));
            LOG.info("Create Consumer for topic1");
            this.createOfflineDurableSubscribers("topic1");
            OfflineDurableSubscriberTimeoutTest.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
                }
            }));
            OfflineDurableSubscriberTimeoutTest.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
                }
            }));
            OfflineDurableSubscriberTimeoutTest.assertFalse((String)"have not found any log warn/error", (boolean)foundLogMessage.get());
        }
        finally {
            loggerMRB.removeAppender((Appender)appender);
            loggerTopic.removeAppender((Appender)appender);
        }
    }

    private void createOfflineDurableSubscribers(String topic) throws Exception {
        Connection con = this.createConnection();
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((javax.jms.Topic)this.createDestination(topic), "sub1", null, true);
        session.close();
        con.close();
    }
}

