/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4853Test {
    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4853Test.class);
    private static BrokerService brokerService;
    private static final String BROKER_ADDRESS = "tcp://localhost:0";
    private static final ActiveMQQueue DESTINATION;
    private CountDownLatch cycleDoneLatch;
    private String connectionUri;

    @Before
    public void setUp() throws Exception {
        brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setAdvisorySupport(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        this.connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
        brokerService.start();
        brokerService.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        brokerService.stop();
        brokerService.waitUntilStopped();
    }

    @Ignore
    @Test
    public void test() throws Exception {
        CountDownLatch fixedDelayCycleLatch;
        ArrayList<Consumer> fixedConsumers = new ArrayList<Consumer>(100);
        for (int i = 0; i < 200; ++i) {
            fixedConsumers.add(new Consumer());
        }
        int fixedDelayConsumers = 300;
        int fixedDelayCycles = 25;
        this.cycleDoneLatch = fixedDelayCycleLatch = new CountDownLatch(25);
        CyclicBarrier barrier = new CyclicBarrier(300, new Runnable(){

            @Override
            public void run() {
                LOG.info("Fixed delay consumers cycle {} completed.", (Object)fixedDelayCycleLatch.getCount());
                fixedDelayCycleLatch.countDown();
            }
        });
        for (int i = 0; i < 300; ++i) {
            new Thread(new FixedDelyConsumer(barrier)).start();
        }
        fixedDelayCycleLatch.await(10L, TimeUnit.MINUTES);
        for (Consumer consumer : fixedConsumers) {
            consumer.close();
        }
        fixedConsumers.clear();
    }

    private ConnectionInfo createConnectionInfo() {
        ConnectionId id = new ConnectionId();
        id.setValue("ID:123456789:0:1");
        ConnectionInfo info = new ConnectionInfo();
        info.setConnectionId(id);
        return info;
    }

    private SessionInfo createSessionInfo(ConnectionInfo connection) {
        SessionId id = new SessionId(connection.getConnectionId(), 1L);
        SessionInfo info = new SessionInfo();
        info.setSessionId(id);
        return info;
    }

    public ConsumerInfo createConsumerInfo(SessionInfo session, int value, ActiveMQDestination destination) {
        ConsumerId id = new ConsumerId();
        id.setConnectionId(session.getSessionId().getConnectionId());
        id.setSessionId(1L);
        id.setValue((long)value);
        ConsumerInfo info = new ConsumerInfo();
        info.setConsumerId(id);
        info.setDestination(destination);
        return info;
    }

    @Ignore
    @Test
    public void testPerformanceOfRemovals() throws Exception {
        AdvisoryBroker testObj = (AdvisoryBroker)brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
        ActiveMQQueue destination = new ActiveMQQueue("foo");
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        ConnectionContext connectionContext = new ConnectionContext(connectionInfo);
        connectionContext.setBroker(brokerService.getBroker());
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 200; ++i) {
            ConsumerInfo consumerInfo;
            int j;
            for (j = 1; j <= 500; ++j) {
                consumerInfo = this.createConsumerInfo(sessionInfo, j, (ActiveMQDestination)destination);
                testObj.addConsumer(connectionContext, consumerInfo);
            }
            for (j = 500; j > 0; --j) {
                consumerInfo = this.createConsumerInfo(sessionInfo, j, (ActiveMQDestination)destination);
                testObj.removeConsumer(connectionContext, consumerInfo);
            }
            for (j = 1; j <= 500; ++j) {
                consumerInfo = this.createConsumerInfo(sessionInfo, j, (ActiveMQDestination)destination);
                testObj.addConsumer(connectionContext, consumerInfo);
            }
            for (j = 1; j <= 500; ++j) {
                consumerInfo = this.createConsumerInfo(sessionInfo, j, (ActiveMQDestination)destination);
                testObj.removeConsumer(connectionContext, consumerInfo);
            }
        }
        long finish = System.currentTimeMillis();
        long totalTime = finish - start;
        LOG.info("Total test time: {} seconds", (Object)TimeUnit.MILLISECONDS.toSeconds(totalTime));
        Assert.assertEquals((long)0L, (long)testObj.getAdvisoryConsumers().size());
    }

    @Test
    public void testEqualsNeeded() throws Exception {
        ConsumerInfo consumerInfo;
        int j;
        AdvisoryBroker testObj = (AdvisoryBroker)brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
        ActiveMQQueue destination = new ActiveMQQueue("foo");
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        ConnectionContext connectionContext = new ConnectionContext(connectionInfo);
        connectionContext.setBroker(brokerService.getBroker());
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        for (j = 1; j <= 5; ++j) {
            consumerInfo = this.createConsumerInfo(sessionInfo, j, (ActiveMQDestination)destination);
            testObj.addConsumer(connectionContext, consumerInfo);
        }
        for (j = 1; j <= 5; ++j) {
            consumerInfo = this.createConsumerInfo(sessionInfo, j, (ActiveMQDestination)destination);
            testObj.removeConsumer(connectionContext, consumerInfo);
        }
        Assert.assertEquals((long)0L, (long)testObj.getAdvisoryConsumers().size());
    }

    private boolean done() {
        if (this.cycleDoneLatch == null) {
            return true;
        }
        return this.cycleDoneLatch.getCount() == 0L;
    }

    static {
        DESTINATION = new ActiveMQQueue("TEST.QUEUE");
    }

    class FixedDelyConsumer
    implements Runnable {
        private final CyclicBarrier barrier;
        private final int sleepInterval;

        public FixedDelyConsumer(CyclicBarrier barrier) {
            this.barrier = barrier;
            this.sleepInterval = 1000;
        }

        public FixedDelyConsumer(CyclicBarrier barrier, int sleepInterval) {
            this.barrier = barrier;
            this.sleepInterval = sleepInterval;
        }

        @Override
        public void run() {
            while (!AMQ4853Test.this.done()) {
                try {
                    Consumer consumer = new Consumer();
                    TimeUnit.MILLISECONDS.sleep(this.sleepInterval);
                    consumer.close();
                    this.barrier.await();
                }
                catch (Exception ex) {
                    return;
                }
            }
        }
    }

    class Consumer
    implements MessageListener {
        Connection connection;
        Session session;
        Destination destination;
        MessageConsumer consumer;

        Consumer() throws JMSException {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(AMQ4853Test.this.connectionUri);
            this.connection = factory.createConnection();
            this.session = this.connection.createSession(false, 1);
            this.consumer = this.session.createConsumer((Destination)DESTINATION);
            this.consumer.setMessageListener((MessageListener)this);
            this.connection.start();
        }

        public void onMessage(Message message) {
        }

        public void close() {
            try {
                this.connection.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.connection = null;
            this.session = null;
            this.consumer = null;
        }
    }
}

