/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.net.URI;
import java.util.ArrayList;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.DynamicNetworkTestSupport;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NetworkDurableRecreationTest
extends DynamicNetworkTestSupport {
    protected NetworkConnector connector;

    @Test(timeout=30000L)
    public void testDurableConsumer() throws Exception {
        this.testReceive(this.remoteBroker, this.remoteSession, this.localBroker, this.localSession, new DynamicNetworkTestSupport.ConsumerCreator(){

            @Override
            public MessageConsumer createConsumer() throws JMSException {
                return NetworkDurableRecreationTest.this.remoteSession.createDurableSubscriber((Topic)NetworkDurableRecreationTest.this.included, NetworkDurableRecreationTest.this.subName);
            }
        });
    }

    @Test(timeout=30000L)
    public void testDurableConsumerReverse() throws Exception {
        this.testReceive(this.localBroker, this.localSession, this.remoteBroker, this.remoteSession, new DynamicNetworkTestSupport.ConsumerCreator(){

            @Override
            public MessageConsumer createConsumer() throws JMSException {
                return NetworkDurableRecreationTest.this.localSession.createDurableSubscriber((Topic)NetworkDurableRecreationTest.this.included, NetworkDurableRecreationTest.this.subName);
            }
        });
    }

    @Test(timeout=30000L)
    public void testDurableAndTopicConsumer() throws Exception {
        this.testReceive(this.remoteBroker, this.remoteSession, this.localBroker, this.localSession, new DynamicNetworkTestSupport.ConsumerCreator(){

            @Override
            public MessageConsumer createConsumer() throws JMSException {
                return NetworkDurableRecreationTest.this.remoteSession.createConsumer((Destination)NetworkDurableRecreationTest.this.included);
            }
        });
    }

    @Test(timeout=30000L)
    public void testDurableAndTopicConsumerReverse() throws Exception {
        this.testReceive(this.localBroker, this.localSession, this.remoteBroker, this.remoteSession, new DynamicNetworkTestSupport.ConsumerCreator(){

            @Override
            public MessageConsumer createConsumer() throws JMSException {
                return NetworkDurableRecreationTest.this.localSession.createConsumer((Destination)NetworkDurableRecreationTest.this.included);
            }
        });
    }

    protected void testReceive(BrokerService receiveBroker, Session receiveSession, BrokerService publishBroker, Session publishSession, DynamicNetworkTestSupport.ConsumerCreator secondConsumerCreator) throws Exception {
        DemandForwardingBridge bridge;
        DestinationStatistics destinationStatistics = publishBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        MessageProducer includedProducer = publishSession.createProducer((Destination)this.included);
        TopicSubscriber bridgeConsumer = receiveSession.createDurableSubscriber((Topic)this.included, this.subName);
        this.waitForConsumerCount(destinationStatistics, 1);
        if (publishBroker.getNetworkConnectors().size() > 0) {
            Wait.waitFor(() -> ((NetworkConnector)publishBroker.getNetworkConnectors().get(0)).activeBridges().size() == 1, (long)10000L, (long)500L);
            bridge = (NetworkBridge)((NetworkConnector)publishBroker.getNetworkConnectors().get(0)).activeBridges().iterator().next();
        } else {
            bridge = this.findDuplexBridge(publishBroker.getTransportConnectorByScheme("tcp"));
        }
        this.assertSubscriptionMapCounts((NetworkBridge)bridge, 2);
        ConnectionContext context = new ConnectionContext();
        RemoveSubscriptionInfo info = this.getRemoveSubscriptionInfo(context, receiveBroker);
        bridgeConsumer.close();
        Thread.sleep(1000L);
        receiveBroker.getBroker().removeSubscription(context, info);
        this.waitForConsumerCount(destinationStatistics, 0);
        this.assertSubscriptionMapCounts((NetworkBridge)bridge, 1);
        MessageConsumer bridgeConsumer2 = secondConsumerCreator.createConsumer();
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)publishSession.createTextMessage("test"));
        Assert.assertNotNull((Object)bridgeConsumer2.receive(5000L));
    }

    @Before
    public void setUp() throws Exception {
        this.doSetUp(true);
    }

    @After
    public void tearDown() throws Exception {
        this.doTearDown();
    }

    @Override
    protected void doTearDown() throws Exception {
        if (this.localConnection != null) {
            this.localConnection.close();
        }
        if (this.remoteConnection != null) {
            this.remoteConnection.close();
        }
        if (this.localBroker != null) {
            this.localBroker.stop();
        }
        if (this.remoteBroker != null) {
            this.remoteBroker.stop();
        }
    }

    protected void doSetUp(boolean deleteAllMessages) throws Exception {
        this.remoteBroker = this.createRemoteBroker();
        this.remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        this.localBroker = this.createLocalBroker();
        this.localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        URI localURI = this.localBroker.getVmConnectorURI();
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
        fac.setAlwaysSyncSend(true);
        fac.setDispatchAsync(false);
        this.localConnection = fac.createConnection();
        this.localConnection.setClientID(this.clientId);
        this.localConnection.start();
        URI remoteURI = this.remoteBroker.getVmConnectorURI();
        fac = new ActiveMQConnectionFactory(remoteURI);
        this.remoteConnection = fac.createConnection();
        this.remoteConnection.setClientID(this.clientId);
        this.remoteConnection.start();
        this.included = new ActiveMQTopic(this.testTopicName);
        this.localSession = this.localConnection.createSession(false, 1);
        this.remoteSession = this.remoteConnection.createSession(false, 1);
    }

    protected BrokerService createLocalBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setMonitorConnectionSplits(true);
        brokerService.setDataDirectoryFile(this.tempFolder.newFolder());
        brokerService.setBrokerName("localBroker");
        this.connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)"));
        this.connector.setName("networkConnector");
        this.connector.setDecreaseNetworkConsumerPriority(false);
        this.connector.setConduitSubscriptions(true);
        this.connector.setDuplex(true);
        ArrayList<ActiveMQTopic> includedDestinations = new ArrayList<ActiveMQTopic>();
        includedDestinations.add(new ActiveMQTopic(this.testTopicName));
        ArrayList<ActiveMQTopic> excludedDestinations = new ArrayList<ActiveMQTopic>();
        excludedDestinations.add(new ActiveMQTopic(this.excludeTopicName));
        this.connector.setDynamicallyIncludedDestinations(includedDestinations);
        this.connector.setExcludedDestinations(excludedDestinations);
        brokerService.addNetworkConnector(this.connector);
        brokerService.addConnector("tcp://localhost:61616");
        return brokerService;
    }

    protected BrokerService createRemoteBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("remoteBroker");
        brokerService.setUseJmx(false);
        brokerService.setDataDirectoryFile(this.tempFolder.newFolder());
        brokerService.addConnector("tcp://localhost:61617");
        return brokerService;
    }
}

