/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.vm;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.bugs.embedded.ThreadExplorer;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DurableConduitBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VmTransportNetworkBrokerTest
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(VmTransportNetworkBrokerTest.class);
    private static final String VM_BROKER_URI = "vm://localhost?create=false";

    public void testNoThreadLeak() throws Exception {
        Thread[] threads = this.filterDaemonThreads(ThreadExplorer.listThreads());
        final int originalThreadCount = threads.length;
        LOG.debug(ThreadExplorer.show("threads at beginning"));
        BrokerService broker = new BrokerService();
        broker.setDedicatedTaskRunner(true);
        broker.setPersistent(false);
        broker.addConnector("tcp://localhost:61616");
        NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://wrongHostname1:61617,tcp://wrongHostname2:61618)?useExponentialBackOff=false");
        networkConnector.setDuplex(true);
        broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI));
        Connection connection = cf.createConnection("system", "manager");
        connection.start();
        TimeUnit.SECONDS.sleep(5L);
        int threadCountAfterStart = Thread.activeCount();
        TimeUnit.SECONDS.sleep(20L);
        int threadCountAfterSleep = Thread.activeCount();
        VmTransportNetworkBrokerTest.assertTrue((String)("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep), (threadCountAfterSleep < 2 * threadCountAfterStart ? 1 : 0) != 0);
        connection.close();
        broker.stop();
        broker.waitUntilStopped();
        broker = new BrokerService();
        broker.setSchedulerSupport(true);
        broker.setDedicatedTaskRunner(true);
        broker.setPersistent(false);
        broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
        broker.start();
        cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
        connection = cf.createConnection("system", "manager");
        connection.start();
        connection.close();
        broker.stop();
        broker.waitUntilStopped();
        final AtomicInteger threadCountAfterStop = new AtomicInteger();
        boolean ok = Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info(ThreadExplorer.show("active after stop"));
                Thread[] threads = VmTransportNetworkBrokerTest.this.filterDaemonThreads(ThreadExplorer.listThreads());
                threadCountAfterStop.set(threads.length);
                return threadCountAfterStop.get() <= originalThreadCount;
            }
        });
        LOG.info("originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop);
        VmTransportNetworkBrokerTest.assertTrue((String)("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop.get()), (boolean)ok);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testInvalidClientIdAndDurableSubs() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseJmx(false);
        broker.setDedicatedTaskRunner(true);
        broker.setPersistent(false);
        broker.addConnector("tcp://localhost:0");
        broker.start();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0)).getPublishableConnectString());
        Connection connection = connectionFactory.createConnection("system", "manager");
        connection.setClientID("F1_forwarder_outbound");
        connection.start();
        BrokerService forwarder = new BrokerService();
        forwarder.setBrokerName("forwarder");
        forwarder.setPersistent(false);
        forwarder.setUseJmx(false);
        forwarder.start();
        ActiveMQConnectionFactory vmFactory = new ActiveMQConnectionFactory("vm://forwarder");
        Connection vmConnection = vmFactory.createConnection("system", "manager");
        vmConnection.setClientID("vm_local");
        vmConnection.start();
        Session session = vmConnection.createSession(false, 1);
        for (int i = 0; i < 5000; ++i) {
            session.createDurableSubscriber((Topic)new ActiveMQTopic("T" + i), "" + i);
        }
        vmConnection.close();
        final AtomicInteger logCounts = new AtomicInteger(0);
        DefaultTestAppender appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getLevel() == Level.ERROR) {
                    logCounts.incrementAndGet();
                }
            }
        };
        org.apache.log4j.Logger.getLogger(DurableConduitBridge.class).addAppender((Appender)appender);
        try {
            NetworkConnector networkConnector = forwarder.addNetworkConnector("static:(" + ((TransportConnector)broker.getTransportConnectors().get(0)).getPublishableConnectString() + ")");
            networkConnector.setName("F1");
            forwarder.addNetworkConnector(networkConnector);
            forwarder.startAllConnectors();
            TimeUnit.SECONDS.sleep(1L);
            connection.close();
            forwarder.stop();
            broker.stop();
            VmTransportNetworkBrokerTest.assertEquals((String)"no errors", (int)0, (int)logCounts.get());
        }
        finally {
            org.apache.log4j.Logger.getLogger(DurableConduitBridge.class).removeAppender((Appender)appender);
        }
    }

    public Thread[] filterDaemonThreads(Thread[] threads) throws Exception {
        ArrayList<Thread> threadList = new ArrayList<Thread>(Arrays.asList(threads));
        for (int i = 0; i < threadList.size(); ++i) {
            Thread thread = (Thread)threadList.get(i);
            LOG.debug("Inspecting thread " + thread.getName());
            if (!thread.isDaemon() || thread.getName().contains("ActiveMQ")) continue;
            LOG.debug("Removing deamon thread.");
            threadList.remove(thread);
            Thread.sleep(100L);
        }
        LOG.debug("Converting list back to Array");
        return threadList.toArray(new Thread[0]);
    }
}

