/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import com.google.common.collect.Lists;
import java.io.File;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.DynamicNetworkTestSupport;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class DurableSyncNetworkBridgeTest
extends DynamicNetworkTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
    protected JavaRuntimeConfigurationBroker remoteRuntimeBroker;
    protected String staticIncludeTopics = "include.static.test";
    protected String includedTopics = "include.test.>";
    protected String testTopicName2 = "include.test.bar2";
    private boolean dynamicOnly = false;
    private boolean forceDurable = false;
    private boolean useVirtualDestSubs = false;
    private byte remoteBrokerWireFormatVersion = (byte)12;
    private BrokerService broker1;
    private BrokerService broker2;
    private Session session1;
    private Session session2;
    private final FLOW flow;
    @Rule
    public Timeout globalTimeout = new Timeout(30L, TimeUnit.SECONDS);
    public static final String KEYSTORE_TYPE = "jks";
    public static final String PASSWORD = "password";
    public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
    public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
    protected AdvisoryBroker remoteAdvisoryBroker;

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList({FLOW.FORWARD}, {FLOW.REVERSE});
    }

    public DurableSyncNetworkBridgeTest(FLOW flow) {
        this.flow = flow;
    }

    @Before
    public void setUp() throws Exception {
        this.includedTopics = "include.test.>";
        this.staticIncludeTopics = "include.static.test";
        this.dynamicOnly = false;
        this.forceDurable = false;
        this.useVirtualDestSubs = false;
        this.remoteBrokerWireFormatVersion = (byte)12;
        this.doSetUp(true, true, this.tempFolder.newFolder(), this.tempFolder.newFolder());
    }

    @After
    public void tearDown() throws Exception {
        this.doTearDown();
    }

    @Test
    public void testRemoveSubscriptionPropagate() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        TopicSubscriber sub1 = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        sub1.close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.removeSubscription(this.broker1, topic, this.subName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
    }

    @Test
    public void testRemoveSubscriptionPropegateAfterRestart() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        TopicSubscriber sub1 = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        sub1.close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.restartBrokers(true);
        this.assertBridgeStarted();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.removeSubscription(this.broker1, topic, this.subName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
    }

    @Test
    public void testRemoveSubscriptionWithBridgeOffline() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        TopicSubscriber sub1 = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        sub1.close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.doTearDown();
        this.restartBroker(this.broker1, false);
        this.restartBroker(this.broker2, false);
        MessageProducer prod = this.session2.createProducer((Destination)topic);
        for (int i = 0; i < 10; ++i) {
            prod.send((Message)this.session2.createTextMessage("test"));
        }
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.removeSubscription(this.broker1, topic, this.subName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.doTearDown();
        this.restartBroker(this.broker2, true);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.restartBroker(this.broker1, true);
        this.assertBridgeStarted();
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
    }

    @Test
    public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        TopicSubscriber sub1 = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        sub1.close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.doTearDown();
        this.includedTopics = "different.topic";
        this.restartBroker(this.broker1, false);
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.removeSubscription(this.broker1, topic, this.subName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.restartBroker(this.broker2, true);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.restartBroker(this.broker1, true);
        this.assertBridgeStarted();
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
    }

    @Test
    public void testSubscriptionRemovedAfterIncludedChanged() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        TopicSubscriber sub1 = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        sub1.close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.doTearDown();
        this.includedTopics = "different.topic";
        this.restartBroker(this.broker1, false);
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.restartBroker(this.broker2, true);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.restartBroker(this.broker1, true);
        this.assertBridgeStarted();
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.assertSubscriptionsCount(this.broker1, topic, 1);
    }

    @Test
    public void testSubscriptionRemovedAfterStaticChanged() throws Exception {
        this.forceDurable = true;
        this.restartBrokers(true);
        ActiveMQTopic topic = new ActiveMQTopic(this.staticIncludeTopics);
        TopicSubscriber sub1 = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        sub1.close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.doTearDown();
        this.staticIncludeTopics = "different.topic";
        this.restartBrokers(false);
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        MessageProducer prod = this.session2.createProducer((Destination)topic);
        for (int i = 0; i < 10; ++i) {
            prod.send((Message)this.session2.createTextMessage("test"));
        }
        this.restartBroker(this.broker2, true);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.restartBroker(this.broker1, true);
        this.assertBridgeStarted();
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.assertSubscriptionsCount(this.broker1, topic, 1);
    }

    @Test
    public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic topic2 = new ActiveMQTopic(this.testTopicName2);
        TopicSubscriber sub1 = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        sub1.close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.doTearDown();
        this.restartBroker(this.broker1, false);
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.session1.createDurableSubscriber((Topic)topic2, "sub2");
        this.removeSubscription(this.broker1, topic, this.subName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.assertSubscriptionsCount(this.broker1, topic2, 1);
        this.restartBroker(this.broker2, true);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic2, 0);
        this.restartBroker(this.broker1, true);
        this.assertBridgeStarted();
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic2, 1);
    }

    @Test
    public void testAddSubscriptionsWithBridgeOffline() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic topic2 = new ActiveMQTopic(this.testTopicName2);
        ActiveMQTopic excludeTopic = new ActiveMQTopic(this.excludeTopicName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.doTearDown();
        this.restartBroker(this.broker1, false);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.session1.createDurableSubscriber((Topic)topic, this.subName).close();
        this.session1.createDurableSubscriber((Topic)topic, "sub2").close();
        this.session1.createDurableSubscriber((Topic)topic2, "sub3").close();
        this.assertSubscriptionsCount(this.broker1, topic, 2);
        this.assertSubscriptionsCount(this.broker1, topic2, 1);
        this.restartBrokers(true);
        this.assertBridgeStarted();
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic2, 1);
        this.assertNCDurableSubsCount(this.broker2, excludeTopic, 0);
    }

    @Test
    public void testSyncLoadTest() throws Exception {
        int j;
        int i;
        String subName = this.subName;
        for (i = 0; i < 100; ++i) {
            for (j = 0; j < 10; ++j) {
                this.session1.createDurableSubscriber((Topic)new ActiveMQTopic("include.test." + i), subName + i + j).close();
            }
        }
        for (i = 0; i < 100; ++i) {
            this.assertNCDurableSubsCount(this.broker2, new ActiveMQTopic("include.test." + i), 1);
        }
        this.doTearDown();
        this.restartBroker(this.broker1, false);
        for (i = 0; i < 10; ++i) {
            for (j = 0; j < 10; ++j) {
                this.removeSubscription(this.broker1, new ActiveMQTopic("include.test." + i), subName + i + j);
            }
        }
        this.restartBrokers(true);
        for (i = 0; i < 10; ++i) {
            this.assertNCDurableSubsCount(this.broker2, new ActiveMQTopic("include.test." + i), 0);
        }
        for (i = 10; i < 100; ++i) {
            this.assertNCDurableSubsCount(this.broker2, new ActiveMQTopic("include.test." + i), 1);
        }
        this.assertBridgeStarted();
    }

    @Test
    public void testAddSubscriptionsWithBridgeOfflineOpenWire11() throws Exception {
        this.remoteBrokerWireFormatVersion = (byte)11;
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.doTearDown();
        this.restartBroker(this.broker1, false);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.session1.createDurableSubscriber((Topic)topic, this.subName).close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.restartBrokers(true);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.assertBridgeStarted();
    }

    @Test
    public void testAddOfflineSubscriptionWithBridgeOfflineDynamicTrue() throws Exception {
        this.dynamicOnly = true;
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.doTearDown();
        this.restartBroker(this.broker1, false);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.session1.createDurableSubscriber((Topic)topic, this.subName).close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.restartBrokers(true);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.assertBridgeStarted();
    }

    @Test
    public void testAddOnlineSubscriptionWithBridgeOfflineDynamicTrue() throws Exception {
        this.dynamicOnly = true;
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.doTearDown();
        this.restartBroker(this.broker1, false);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.session1.createDurableSubscriber((Topic)topic, this.subName).close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.restartBrokers(true);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.session1.createDurableSubscriber((Topic)topic, this.subName);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertBridgeStarted();
    }

    @Test
    public void testAddAndRemoveSubscriptionsWithBridgeOffline() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic excludeTopic = new ActiveMQTopic(this.excludeTopicName);
        this.session1.createDurableSubscriber((Topic)topic, this.subName).close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.doTearDown();
        this.restartBroker(this.broker1, false);
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.removeSubscription(this.broker1, topic, this.subName);
        this.session1.createDurableSubscriber((Topic)topic, "sub2").close();
        this.assertSubscriptionsCount(this.broker1, topic, 1);
        this.restartBrokers(true);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, excludeTopic, 0);
        this.assertBridgeStarted();
    }

    @Test
    public void testAddOnlineSubscriptionsWithBridgeOffline() throws Exception {
        Assume.assumeTrue((this.flow == FLOW.FORWARD ? 1 : 0) != 0);
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic excludeTopic = new ActiveMQTopic(this.excludeTopicName);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.doTearDown();
        this.restartBrokers(false);
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.session1.createDurableSubscriber((Topic)excludeTopic, "sub-exclude");
        this.session1.createDurableSubscriber((Topic)topic, this.subName);
        this.session1.createDurableSubscriber((Topic)topic, "sub2");
        this.session1.createDurableSubscriber((Topic)topic, "sub3");
        this.assertSubscriptionsCount(this.broker1, topic, 3);
        this.restartBrokers(true);
        this.assertBridgeStarted();
        this.session1.createDurableSubscriber((Topic)topic, this.subName);
        this.session1.createDurableSubscriber((Topic)topic, "sub2");
        this.session1.createDurableSubscriber((Topic)topic, "sub3");
        this.session1.createDurableSubscriber((Topic)excludeTopic, "sub-exclude");
        Thread.sleep(1000L);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, excludeTopic, 0);
    }

    @Test
    public void testAddOnlineSubscriptionsTwoBridges() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic excludeTopic = new ActiveMQTopic(this.excludeTopicName);
        ActiveMQTopic topic2 = new ActiveMQTopic("include.new.topic");
        this.assertSubscriptionsCount(this.broker1, topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.session1.createDurableSubscriber((Topic)excludeTopic, "sub-exclude");
        this.session1.createDurableSubscriber((Topic)topic, this.subName);
        this.session1.createDurableSubscriber((Topic)topic, "sub2");
        this.session1.createDurableSubscriber((Topic)topic, "sub3");
        this.session1.createDurableSubscriber((Topic)topic2, "secondTopicSubName");
        this.assertSubscriptionsCount(this.broker1, topic, 3);
        this.assertSubscriptionsCount(this.broker1, topic2, 1);
        NetworkConnector secondConnector = this.configureLocalNetworkConnector();
        secondConnector.setName("networkConnector2");
        secondConnector.setDynamicallyIncludedDestinations((List)Lists.newArrayList((Object[])new ActiveMQDestination[]{new ActiveMQTopic("include.new.topic?forceDurable=" + this.forceDurable)}));
        this.localBroker.addNetworkConnector(secondConnector);
        secondConnector.start();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return ((NetworkConnector)DurableSyncNetworkBridgeTest.this.localBroker.getNetworkConnectors().get(0)).activeBridges().size() == 1 && ((NetworkConnector)DurableSyncNetworkBridgeTest.this.localBroker.getNetworkConnectors().get(1)).activeBridges().size() == 1;
            }
        }, (long)10000L, (long)500L));
        this.assertNCDurableSubsCount(this.broker2, topic2, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertNCDurableSubsCount(this.broker2, excludeTopic, 0);
        MessageProducer producer = this.session2.createProducer((Destination)topic2);
        producer.send((Message)this.session2.createTextMessage("test"));
        this.waitForDispatchFromLocalBroker(this.broker2.getDestination((ActiveMQDestination)topic2).getDestinationStatistics(), 1);
        this.assertLocalBrokerStatistics(this.broker2.getDestination((ActiveMQDestination)topic2).getDestinationStatistics(), 1);
    }

    @Test(timeout=60000L)
    public void testVirtualDestSubForceDurableSync() throws Exception {
        Assume.assumeTrue((this.flow == FLOW.FORWARD ? 1 : 0) != 0);
        this.forceDurable = true;
        this.useVirtualDestSubs = true;
        this.restartBrokers(true);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        this.waitForConsumerCount(destinationStatistics, 1);
        this.assertNCDurableSubsCount(this.localBroker, this.included, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteDestStatistics.getMessages().getCount());
        this.stopRemoteBroker();
        for (int i = 0; i < 500; ++i) {
            includedProducer.send((Message)test);
        }
        this.stopLocalBroker();
        this.restartRemoteBroker();
        this.remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        this.restartLocalBroker(true);
        final DestinationStatistics remoteDestStatistics2 = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return remoteDestStatistics2.getMessages().getCount() == 501L;
            }
        }));
    }

    @Test(timeout=60000L)
    public void testForceDurableTopicSubSync() throws Exception {
        Assume.assumeTrue((this.flow == FLOW.FORWARD ? 1 : 0) != 0);
        this.forceDurable = true;
        this.restartBrokers(true);
        this.remoteSession.createConsumer((Destination)this.included);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        this.waitForConsumerCount(destinationStatistics, 1);
        this.assertNCDurableSubsCount(this.localBroker, this.included, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        this.localBroker.getNetworkConnectorByName("networkConnector").stop();
        for (int i = 0; i < 500; ++i) {
            includedProducer.send((Message)test);
        }
        this.stopLocalBroker();
        this.restartLocalBroker(true);
        DestinationStatistics destinationStatistics2 = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        this.waitForDispatchFromLocalBroker(destinationStatistics2, 500);
        this.assertLocalBrokerStatistics(destinationStatistics2, 500);
    }

    protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination ... forwardTo) {
        CompositeTopic compositeTopic = new CompositeTopic();
        compositeTopic.setName(name);
        compositeTopic.setForwardOnly(true);
        compositeTopic.setForwardTo((Collection)Lists.newArrayList((Object[])forwardTo));
        return compositeTopic;
    }

    protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception {
        if (broker.getBrokerName().equals("localBroker")) {
            this.restartLocalBroker(startNetworkConnector);
        } else {
            this.restartRemoteBroker();
        }
    }

    protected void restartBrokers(boolean startNetworkConnector) throws Exception {
        this.doTearDown();
        this.doSetUp(false, startNetworkConnector, this.localBroker.getDataDirectoryFile(), this.remoteBroker.getDataDirectoryFile());
    }

    protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir, File remoteDataDir) throws Exception {
        this.included = new ActiveMQTopic(this.testTopicName);
        this.doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
        this.doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir);
        Thread.sleep(1000L);
    }

    protected void restartLocalBroker(boolean startNetworkConnector) throws Exception {
        this.stopLocalBroker();
        this.doSetUpLocalBroker(false, startNetworkConnector, this.localBroker.getDataDirectoryFile());
    }

    protected void restartRemoteBroker() throws Exception {
        int port = 0;
        if (this.remoteBroker != null) {
            List transportConnectors = this.remoteBroker.getTransportConnectors();
            port = ((TransportConnector)transportConnectors.get(0)).getConnectUri().getPort();
        }
        this.stopRemoteBroker();
        this.doSetUpRemoteBroker(false, this.remoteBroker.getDataDirectoryFile(), port);
    }

    protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, File dataDir) throws Exception {
        this.localBroker = this.createLocalBroker(dataDir, startNetworkConnector);
        this.localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        URI localURI = this.localBroker.getVmConnectorURI();
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
        fac.setAlwaysSyncSend(true);
        fac.setDispatchAsync(false);
        this.localConnection = fac.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        if (startNetworkConnector) {
            Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return ((NetworkConnector)DurableSyncNetworkBridgeTest.this.localBroker.getNetworkConnectors().get(0)).activeBridges().size() == 1;
                }
            }, (long)5000L, (long)500L);
        }
        this.localSession = this.localConnection.createSession(false, 1);
        if (this.flow.equals((Object)FLOW.FORWARD)) {
            this.broker1 = this.localBroker;
            this.session1 = this.localSession;
        } else {
            this.broker2 = this.localBroker;
            this.session2 = this.localSession;
        }
    }

    protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception {
        this.remoteBroker = this.createRemoteBroker(dataDir, port);
        this.remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        URI remoteURI = this.remoteBroker.getVmConnectorURI();
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI);
        this.remoteConnection = fac.createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.remoteSession = this.remoteConnection.createSession(false, 1);
        if (this.flow.equals((Object)FLOW.FORWARD)) {
            this.broker2 = this.remoteBroker;
            this.session2 = this.remoteSession;
            this.remoteRuntimeBroker = (JavaRuntimeConfigurationBroker)this.remoteBroker.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
        } else {
            this.broker1 = this.remoteBroker;
            this.session1 = this.remoteSession;
        }
    }

    protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setMonitorConnectionSplits(true);
        brokerService.setBrokerName("localBroker");
        brokerService.setDataDirectoryFile(dataDir);
        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
        adapter.setDirectory(dataDir);
        adapter.setJournalDiskSyncStrategy(Journal.JournalDiskSyncStrategy.PERIODIC.name());
        brokerService.setPersistenceAdapter((PersistenceAdapter)adapter);
        brokerService.setUseVirtualDestSubs(this.useVirtualDestSubs);
        brokerService.setUseVirtualDestSubsOnCreation(this.useVirtualDestSubs);
        if (startNetworkConnector) {
            brokerService.addNetworkConnector(this.configureLocalNetworkConnector());
        }
        brokerService.addConnector("auto+nio+ssl://localhost:0");
        return brokerService;
    }

    protected NetworkConnector configureLocalNetworkConnector() throws Exception {
        List transportConnectors = this.remoteBroker.getTransportConnectors();
        URI remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
        String uri = "static:(" + remoteURI + ")";
        DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
        connector.setName("networkConnector");
        connector.setDynamicOnly(this.dynamicOnly);
        connector.setDecreaseNetworkConsumerPriority(false);
        connector.setConduitSubscriptions(true);
        connector.setDuplex(true);
        connector.setStaticBridge(false);
        connector.setSyncDurableSubs(true);
        connector.setUseVirtualDestSubs(this.useVirtualDestSubs);
        connector.setStaticallyIncludedDestinations((List)Lists.newArrayList((Object[])new ActiveMQDestination[]{new ActiveMQTopic(this.staticIncludeTopics + "?forceDurable=" + this.forceDurable)}));
        connector.setDynamicallyIncludedDestinations((List)Lists.newArrayList((Object[])new ActiveMQDestination[]{new ActiveMQTopic(this.includedTopics + "?forceDurable=" + this.forceDurable)}));
        connector.setExcludedDestinations((List)Lists.newArrayList((Object[])new ActiveMQDestination[]{new ActiveMQTopic(this.excludeTopicName)}));
        return connector;
    }

    protected BrokerService createRemoteBroker(File dataDir, int port) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("remoteBroker");
        brokerService.setUseJmx(false);
        brokerService.setDataDirectoryFile(dataDir);
        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
        adapter.setDirectory(dataDir);
        adapter.setJournalDiskSyncStrategy(Journal.JournalDiskSyncStrategy.PERIODIC.name());
        brokerService.setPersistenceAdapter((PersistenceAdapter)adapter);
        brokerService.setUseVirtualDestSubs(this.useVirtualDestSubs);
        brokerService.setUseVirtualDestSubsOnCreation(this.useVirtualDestSubs);
        if (this.useVirtualDestSubs) {
            brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()});
        }
        this.remoteAdvisoryBroker = (AdvisoryBroker)brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
        brokerService.addConnector("auto+nio+ssl://localhost:" + port + "?wireFormat.cacheSize=2048&wireFormat.version=" + this.remoteBrokerWireFormatVersion);
        return brokerService;
    }

    static {
        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
    }

    public static enum FLOW {
        FORWARD,
        REVERSE;

    }
}

