/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.io.Serializable;
import java.net.URI;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DemandForwardingBridgeSupport;
import org.apache.activemq.network.DemandSubscription;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

public class CompressionOverNetworkTest {
    protected static final int RECEIVE_TIMEOUT_MILLS = 10000;
    protected static final int MESSAGE_COUNT = 10;
    private static final Logger LOG = LoggerFactory.getLogger(CompressionOverNetworkTest.class);
    protected AbstractApplicationContext context;
    protected Connection localConnection;
    protected Connection remoteConnection;
    protected BrokerService localBroker;
    protected BrokerService remoteBroker;
    protected Session localSession;
    protected Session remoteSession;
    protected ActiveMQDestination included;

    @Test
    public void testCompressedOverCompressedNetwork() throws Exception {
        ActiveMQConnection localAmqConnection = (ActiveMQConnection)this.localConnection;
        localAmqConnection.setUseCompression(true);
        MessageConsumer consumer1 = this.remoteSession.createConsumer((Destination)this.included);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        producer.setDeliveryMode(1);
        this.waitForConsumerRegistration(this.localBroker, 1, this.included);
        StringBuilder payload = new StringBuilder("test-");
        for (int i = 0; i < 100; ++i) {
            payload.append(UUID.randomUUID().toString());
        }
        TextMessage test = this.localSession.createTextMessage(payload.toString());
        producer.send((Message)test);
        Message msg = consumer1.receive(10000L);
        Assert.assertNotNull((Object)msg);
        ActiveMQTextMessage message = (ActiveMQTextMessage)msg;
        Assert.assertTrue((boolean)message.isCompressed());
        Assert.assertEquals((Object)payload.toString(), (Object)message.getText());
    }

    @Test
    public void testTextMessageCompression() throws Exception {
        MessageConsumer consumer1 = this.remoteSession.createConsumer((Destination)this.included);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        producer.setDeliveryMode(1);
        this.waitForConsumerRegistration(this.localBroker, 1, this.included);
        StringBuilder payload = new StringBuilder("test-");
        for (int i = 0; i < 100; ++i) {
            payload.append(UUID.randomUUID().toString());
        }
        TextMessage test = this.localSession.createTextMessage(payload.toString());
        producer.send((Message)test);
        Message msg = consumer1.receive(10000L);
        Assert.assertNotNull((Object)msg);
        ActiveMQTextMessage message = (ActiveMQTextMessage)msg;
        Assert.assertTrue((boolean)message.isCompressed());
        Assert.assertEquals((Object)payload.toString(), (Object)message.getText());
    }

    @Test
    public void testBytesMessageCompression() throws Exception {
        MessageConsumer consumer1 = this.remoteSession.createConsumer((Destination)this.included);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        producer.setDeliveryMode(1);
        this.waitForConsumerRegistration(this.localBroker, 1, this.included);
        StringBuilder payload = new StringBuilder("test-");
        for (int i = 0; i < 100; ++i) {
            payload.append(UUID.randomUUID().toString());
        }
        byte[] bytes = payload.toString().getBytes("UTF-8");
        BytesMessage test = this.localSession.createBytesMessage();
        test.writeBytes(bytes);
        producer.send((Message)test);
        Message msg = consumer1.receive(10000L);
        Assert.assertNotNull((Object)msg);
        ActiveMQBytesMessage message = (ActiveMQBytesMessage)msg;
        Assert.assertTrue((boolean)message.isCompressed());
        Assert.assertTrue((message.getContent().getLength() < bytes.length ? 1 : 0) != 0);
        byte[] result = new byte[bytes.length];
        Assert.assertEquals((long)bytes.length, (long)message.readBytes(result));
        Assert.assertEquals((long)-1L, (long)message.readBytes(result));
        for (int i = 0; i < bytes.length; ++i) {
            Assert.assertEquals((long)bytes[i], (long)result[i]);
        }
    }

    @Test
    public void testStreamMessageCompression() throws Exception {
        MessageConsumer consumer1 = this.remoteSession.createConsumer((Destination)this.included);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        producer.setDeliveryMode(1);
        this.waitForConsumerRegistration(this.localBroker, 1, this.included);
        StreamMessage test = this.localSession.createStreamMessage();
        for (int i = 0; i < 100; ++i) {
            test.writeString("test string: " + i);
        }
        producer.send((Message)test);
        Message msg = consumer1.receive(10000L);
        Assert.assertNotNull((Object)msg);
        ActiveMQStreamMessage message = (ActiveMQStreamMessage)msg;
        Assert.assertTrue((boolean)message.isCompressed());
        for (int i = 0; i < 100; ++i) {
            Assert.assertEquals((Object)("test string: " + i), (Object)message.readString());
        }
    }

    @Test
    public void testMapMessageCompression() throws Exception {
        MessageConsumer consumer1 = this.remoteSession.createConsumer((Destination)this.included);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        producer.setDeliveryMode(1);
        this.waitForConsumerRegistration(this.localBroker, 1, this.included);
        MapMessage test = this.localSession.createMapMessage();
        for (int i = 0; i < 100; ++i) {
            test.setString(Integer.toString(i), "test string: " + i);
        }
        producer.send((Message)test);
        Message msg = consumer1.receive(10000L);
        Assert.assertNotNull((Object)msg);
        ActiveMQMapMessage message = (ActiveMQMapMessage)msg;
        Assert.assertTrue((boolean)message.isCompressed());
        for (int i = 0; i < 100; ++i) {
            Assert.assertEquals((Object)("test string: " + i), (Object)message.getString(Integer.toString(i)));
        }
    }

    @Test
    public void testObjectMessageCompression() throws Exception {
        MessageConsumer consumer1 = this.remoteSession.createConsumer((Destination)this.included);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        producer.setDeliveryMode(1);
        this.waitForConsumerRegistration(this.localBroker, 1, this.included);
        StringBuilder payload = new StringBuilder("test-");
        for (int i = 0; i < 100; ++i) {
            payload.append(UUID.randomUUID().toString());
        }
        ObjectMessage test = this.localSession.createObjectMessage((Serializable)((Object)payload.toString()));
        producer.send((Message)test);
        Message msg = consumer1.receive(10000L);
        Assert.assertNotNull((Object)msg);
        ActiveMQObjectMessage message = (ActiveMQObjectMessage)msg;
        Assert.assertTrue((boolean)message.isCompressed());
        Assert.assertEquals((Object)payload.toString(), (Object)message.getObject());
    }

    private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
        Assert.assertTrue((String)"Internal bridge consumers registered in time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Object[] bridges = ((NetworkConnector)brokerService.getNetworkConnectors().get((int)0)).bridges.values().toArray();
                if (bridges.length > 0) {
                    LOG.info(brokerService + " bridges " + Arrays.toString(bridges));
                    DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport)bridges[0];
                    ConcurrentMap forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
                    LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
                    if (!forwardingBridges.isEmpty()) {
                        for (DemandSubscription demandSubscription : forwardingBridges.values()) {
                            if (!demandSubscription.getLocalInfo().getDestination().equals((Object)destination)) continue;
                            LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size());
                            return demandSubscription.size() >= min;
                        }
                    }
                }
                return false;
            }
        }));
    }

    @Before
    public void setUp() throws Exception {
        this.doSetUp(true);
    }

    @After
    public void tearDown() throws Exception {
        this.doTearDown();
    }

    protected void doTearDown() throws Exception {
        try {
            this.localConnection.close();
            this.remoteConnection.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            this.localBroker.stop();
        }
        finally {
            this.remoteBroker.stop();
        }
    }

    protected void doSetUp(boolean deleteAllMessages) throws Exception {
        this.localBroker = this.createLocalBroker();
        this.localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        this.remoteBroker = this.createRemoteBroker();
        this.remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        URI localURI = this.localBroker.getVmConnectorURI();
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
        fac.setAlwaysSyncSend(true);
        fac.setDispatchAsync(false);
        this.localConnection = fac.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        URI remoteURI = this.remoteBroker.getVmConnectorURI();
        fac = new ActiveMQConnectionFactory(remoteURI);
        this.remoteConnection = fac.createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.included = new ActiveMQTopic("include.test.bar");
        this.localSession = this.localConnection.createSession(false, 1);
        this.remoteSession = this.remoteConnection.createSession(false, 1);
    }

    protected String getRemoteBrokerURI() {
        return "org/apache/activemq/network/remoteBroker.xml";
    }

    protected String getLocalBrokerURI() {
        return "org/apache/activemq/network/localBroker.xml";
    }

    protected BrokerService createBroker(String uri) throws Exception {
        ClassPathResource resource = new ClassPathResource(uri);
        BrokerFactoryBean factory = new BrokerFactoryBean((Resource)resource);
        resource = new ClassPathResource(uri);
        factory = new BrokerFactoryBean((Resource)resource);
        factory.afterPropertiesSet();
        BrokerService result = factory.getBroker();
        for (NetworkConnector connector : result.getNetworkConnectors()) {
            connector.setUseCompression(true);
        }
        return result;
    }

    protected BrokerService createLocalBroker() throws Exception {
        return this.createBroker(this.getLocalBrokerURI());
    }

    protected BrokerService createRemoteBroker() throws Exception {
        return this.createBroker(this.getRemoteBrokerURI());
    }
}

