/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

public class ClientRebalanceTest
extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ClientRebalanceTest.class);
    private static final String QUEUE_NAME = "Test.ClientRebalanceTest";

    @Override
    protected void setUp() throws Exception {
        this.setAutoFail(true);
        super.setUp();
    }

    public void testRebalance() throws Exception {
        this.createBroker((Resource)new ClassPathResource("org/apache/activemq/usecases/rebalance-broker1.xml"));
        this.createBroker((Resource)new ClassPathResource("org/apache/activemq/usecases/rebalance-broker2.xml"));
        this.startAllBrokers();
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"b1")).broker.waitUntilStarted();
        LOG.info("Starting connection");
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
        Connection conn = factory.createConnection();
        conn.start();
        Session session = conn.createSession(false, 1);
        Queue theQueue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer((Destination)theQueue);
        MessageConsumer consumer = session.createConsumer((Destination)theQueue);
        TextMessage message = session.createTextMessage("Test message");
        producer.send((Message)message);
        Message msg = consumer.receive(2000L);
        ClientRebalanceTest.assertNotNull((Object)msg);
        this.createBroker((Resource)new ClassPathResource("org/apache/activemq/usecases/rebalance-broker3.xml"));
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"b3")).broker.waitUntilStarted();
        Thread.sleep(3000L);
        LOG.info("Stopping broker 1");
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"b1")).broker.stop();
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"b1")).broker.waitUntilStopped();
        Thread.sleep(3000L);
        producer.send((Message)message);
        msg = consumer.receive(2000L);
        ClientRebalanceTest.assertNotNull((Object)msg);
        LOG.info("Stopping broker 2");
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"b2")).broker.stop();
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"b2")).broker.waitUntilStopped();
        producer.send((Message)message);
        msg = consumer.receive(2000L);
        ClientRebalanceTest.assertNotNull((Object)msg);
    }

    public void testReconnect() throws Exception {
        this.createBroker((Resource)new ClassPathResource("org/apache/activemq/usecases/rebalance-broker1.xml"));
        this.createBroker((Resource)new ClassPathResource("org/apache/activemq/usecases/rebalance-broker2.xml"));
        this.startAllBrokers();
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"b1")).broker.waitUntilStarted();
        LOG.info("Starting connection");
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?randomize=false");
        Connection conn = factory.createConnection();
        conn.start();
        Session session = conn.createSession(false, 1);
        Queue theQueue = session.createQueue("Test.ClientReconnectTest");
        MessageProducer producer = session.createProducer((Destination)theQueue);
        MessageConsumer consumer = session.createConsumer((Destination)theQueue);
        TextMessage message = session.createTextMessage("Test message");
        producer.send((Message)message);
        Message msg = consumer.receive(2000L);
        ClientRebalanceTest.assertNotNull((Object)msg);
        TransportConnector transportConnector = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"b1")).broker.getTransportConnectorByName("openwire");
        ClientRebalanceTest.assertNotNull((Object)transportConnector);
        TransportConnection startFailoverConnection = this.findClientFailoverTransportConnection(transportConnector);
        ClientRebalanceTest.assertNotNull((Object)startFailoverConnection);
        String startConnectionId = startFailoverConnection.getConnectionId();
        String startRemoteAddress = startFailoverConnection.getRemoteAddress();
        ConnectionControl simulateRebalance = new ConnectionControl();
        simulateRebalance.setReconnectTo("tcp://localhost:61616");
        startFailoverConnection.dispatchSync((Command)simulateRebalance);
        Thread.sleep(2000L);
        TransportConnection afterFailoverConnection = this.findClientFailoverTransportConnection(transportConnector);
        ClientRebalanceTest.assertNotNull((Object)afterFailoverConnection);
        ClientRebalanceTest.assertEquals((String)startConnectionId, (String)afterFailoverConnection.getConnectionId());
        Assert.assertNotEquals((Object)startRemoteAddress, (Object)afterFailoverConnection.getRemoteAddress());
        producer.send((Message)message);
        msg = consumer.receive(2000L);
        ClientRebalanceTest.assertNotNull((Object)msg);
        conn.close();
    }

    protected TransportConnection findClientFailoverTransportConnection(TransportConnector transportConnector) {
        TransportConnection failoverConnection = null;
        for (TransportConnection tmpConnection : transportConnector.getConnections()) {
            if (tmpConnection.isNetworkConnection() || !tmpConnection.isFaultTolerantConnection()) continue;
            failoverConnection = tmpConnection;
        }
        return failoverConnection;
    }
}

