/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.cluster.bridge;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.integration.cluster.bridge.SimpleTransformer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={ParameterizedTestExtension.class})
public class BridgeTest
extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private ActiveMQServer server0;
    private ActiveMQServer server1;
    private ServerLocator locator;
    private final boolean netty;

    @Parameters(name="isNetty={0}")
    public static Collection getParameters() {
        return Arrays.asList({true}, {false});
    }

    public BridgeTest(boolean isNetty) {
        this.netty = isNetty;
    }

    protected boolean isNetty() {
        return this.netty;
    }

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        StopInterceptor.reset();
        super.setUp();
    }

    private String getConnector() {
        if (this.isNetty()) {
            return NETTY_CONNECTOR_FACTORY;
        }
        return INVM_CONNECTOR_FACTORY;
    }

    @TestTemplate
    public void testSimpleBridge() throws Exception {
        this.internaltestSimpleBridge(false, false);
    }

    @TestTemplate
    public void testSimpleBridgeFiles() throws Exception {
        this.internaltestSimpleBridge(false, true);
    }

    @TestTemplate
    public void testSimpleBridgeLargeMessageNullPersistence() throws Exception {
        this.internaltestSimpleBridge(true, false);
    }

    @TestTemplate
    public void testSimpleBridgeLargeMessageFiles() throws Exception {
        this.internaltestSimpleBridge(true, true);
    }

    @TestTemplate
    public void testLargeMessageBridge() throws Exception {
        ClientMessage message;
        int i;
        long time = System.currentTimeMillis();
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, server0Params);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        connectors.put(server1tc.getName(), server1tc);
        this.server0.getConfiguration().setConnectorConfigurations(connectors);
        int messageSize = 204800;
        int numMessages = 10;
        ArrayList<String> connectorConfig = new ArrayList<String>();
        connectorConfig.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024000).setStaticConnectors(connectorConfig);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        this.server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        this.server0.getConfiguration().setQueueConfigs(queueConfigs0);
        QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
        ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
        queueConfigs1.add(queueConfig1);
        this.server1.getConfiguration().setQueueConfigs(queueConfigs1);
        this.server1.start();
        this.server0.start();
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.addSessionFactory(this.locator.createSessionFactory(server0tc));
        ClientSessionFactory sf1 = this.addSessionFactory(this.locator.createSessionFactory(server1tc));
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientSession session1 = sf1.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        ClientConsumer consumer1 = session1.createConsumer("queue1");
        session1.start();
        byte[] bytes = new byte[204800];
        SimpleString propKey = SimpleString.of((String)"testkey");
        for (i = 0; i < 10; ++i) {
            message = session0.createMessage(true);
            message.putIntProperty(propKey, i);
            message.getBodyBuffer().writeBytes(bytes);
            producer0.send((Message)message);
        }
        for (i = 0; i < 10; ++i) {
            message = consumer1.receive(5000L);
            Assertions.assertNotNull((Object)message);
            Assertions.assertEquals((Object)i, (Object)message.getObjectProperty(propKey));
            this.readLargeMessages(message, 10);
            message.acknowledge();
        }
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        session0.close();
        session1.close();
        sf0.close();
        sf1.close();
        this.closeFields();
        if (this.server0.getConfiguration().isPersistenceEnabled()) {
            Assertions.assertEquals((int)0, (int)this.loadQueues(this.server0).size());
        }
        long timeTaken = System.currentTimeMillis() - time;
    }

    @TestTemplate
    public void testBlockedBridgeAndReconnect() throws Exception {
        ClientMessage message;
        int i;
        long time = System.currentTimeMillis();
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, server0Params);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, server1Params);
        this.server1.getAddressSettingsRepository().clear();
        this.server1.getAddressSettingsRepository().addMatch("#", (Object)new AddressSettings().setMaxSizeBytes(101240L).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
        this.server0.getAddressSettingsRepository().clear();
        this.server0.getAddressSettingsRepository().addMatch("#", (Object)new AddressSettings().setMaxSizeBytes(Long.MAX_VALUE).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        connectors.put(server1tc.getName(), server1tc);
        this.server0.getConfiguration().setConnectorConfigurations(connectors);
        int messageSize = 1024;
        int numMessages = 1000;
        ArrayList<String> connectorConfig = new ArrayList<String>();
        connectorConfig.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(100L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(true).setConfirmationWindowSize(512000).setStaticConnectors(connectorConfig).setProducerWindowSize(1024);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        this.server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        this.server0.getConfiguration().setQueueConfigs(queueConfigs0);
        QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
        ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
        queueConfigs1.add(queueConfig1);
        this.server1.getConfiguration().setQueueConfigs(queueConfigs1);
        this.server1.start();
        this.server0.start();
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.addSessionFactory(this.locator.createSessionFactory(server0tc));
        ClientSessionFactory sf1 = this.addSessionFactory(this.locator.createSessionFactory(server1tc));
        ClientSession session0 = sf0.createSession(false, true, 0);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        ClientSession session1 = sf1.createSession(true, true, 0);
        ClientConsumer consumer1 = session1.createConsumer("queue1");
        session1.start();
        byte[] bytes = new byte[1024];
        SimpleString propKey = SimpleString.of((String)"testkey");
        for (i = 0; i < 1000; ++i) {
            message = session0.createMessage(true);
            message.putIntProperty(propKey, i);
            message.getBodyBuffer().writeBytes(bytes);
            producer0.send((Message)message);
            if (i % 100 != 0) continue;
            session0.commit();
        }
        session0.commit();
        for (i = 0; i < 500; ++i) {
            message = consumer1.receive(5000L);
            Assertions.assertNotNull((Object)message);
            Assertions.assertEquals((Object)i, (Object)message.getObjectProperty(propKey));
            message.acknowledge();
        }
        session1.commit();
        BridgeImpl bridge = (BridgeImpl)this.server0.getClusterManager().getBridges().get("bridge1");
        Wait.assertTrue((String)"bridge is never blocked", () -> ((BridgeImpl)bridge).isBlockedOnFlowControl());
        session1.close();
        sf1.close();
        this.server1.stop();
        this.server1.start();
        sf1 = this.addSessionFactory(this.locator.createSessionFactory(server1tc));
        session1 = sf1.createSession(true, true, 0);
        consumer1 = session1.createConsumer("queue1");
        session1.start();
        for (int i2 = 500; i2 < 1000; ++i2) {
            ClientMessage message2 = consumer1.receive(5000L);
            Assertions.assertNotNull((Object)message2);
            Assertions.assertEquals((Object)i2, (Object)message2.getObjectProperty(propKey));
            message2.acknowledge();
        }
        Wait.assertEquals((long)0L, () -> ((Queue)this.server0.locateQueue(SimpleString.of((String)"queue0"))).getMessageCount());
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        session0.close();
        session1.close();
        sf0.close();
        sf1.close();
        this.closeFields();
        if (this.server0.getConfiguration().isPersistenceEnabled()) {
            Assertions.assertEquals((int)0, (int)this.loadQueues(this.server0).size());
        }
        long timeTaken = System.currentTimeMillis() - time;
    }

    public void internaltestSimpleBridge(boolean largeMessage, boolean useFiles) throws Exception {
        ClientMessage message;
        int i;
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, useFiles, server0Params);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, useFiles, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        connectors.put(server1tc.getName(), server1tc);
        this.server0.getConfiguration().setConnectorConfigurations(connectors);
        int messageSize = 1024;
        int numMessages = 10;
        ArrayList<String> connectorConfig = new ArrayList<String>();
        connectorConfig.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(5120).setStaticConnectors(connectorConfig);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        this.server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        this.server0.getConfiguration().setQueueConfigs(queueConfigs0);
        QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
        ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
        queueConfigs1.add(queueConfig1);
        this.server1.getConfiguration().setQueueConfigs(queueConfigs1);
        this.server1.start();
        this.server0.start();
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.addSessionFactory(this.locator.createSessionFactory(server0tc));
        ClientSessionFactory sf1 = this.addSessionFactory(this.locator.createSessionFactory(server1tc));
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientSession session1 = sf1.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        ClientConsumer consumer1 = session1.createConsumer("queue1");
        session1.start();
        byte[] bytes = new byte[1024];
        SimpleString propKey = SimpleString.of((String)"testkey");
        for (i = 0; i < 10; ++i) {
            message = session0.createMessage(true);
            if (largeMessage) {
                message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
            }
            message.putIntProperty(propKey, i);
            message.getBodyBuffer().writeBytes(bytes);
            producer0.send((Message)message);
        }
        for (i = 0; i < 10; ++i) {
            message = consumer1.receive(5000L);
            Assertions.assertNotNull((Object)message);
            Assertions.assertEquals((Object)i, (Object)message.getObjectProperty(propKey));
            if (largeMessage) {
                this.readLargeMessages(message, 10);
            }
            message.acknowledge();
        }
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        session0.close();
        session1.close();
        sf0.close();
        sf1.close();
        Assertions.assertEquals((int)1, (int)this.server0.getClusterManager().getBridges().size());
        BridgeMetrics bridgeMetrics = ((Bridge)this.server0.getClusterManager().getBridges().get("bridge1")).getMetrics();
        Assertions.assertEquals((long)10L, (long)bridgeMetrics.getMessagesPendingAcknowledgement());
        Assertions.assertEquals((long)10L, (long)bridgeMetrics.getMessagesAcknowledged());
        this.closeFields();
        if (this.server0.getConfiguration().isPersistenceEnabled()) {
            Assertions.assertEquals((int)0, (int)this.loadQueues(this.server0).size());
        }
    }

    private void addTargetParameters(Map<String, Object> server1Params) {
        if (this.isNetty()) {
            server1Params.put("port", 61617);
        } else {
            server1Params.put("serverId", 1);
        }
    }

    private void readLargeMessages(ClientMessage message, int kiloBlocks) {
        byte[] byteRead = new byte[1024];
        for (int j = 0; j < kiloBlocks; ++j) {
            message.getBodyBuffer().readBytes(byteRead);
        }
    }

    @TestTemplate
    public void testWithFilter() throws Exception {
        this.internalTestWithFilter(false, false);
    }

    @TestTemplate
    public void testWithFilterFiles() throws Exception {
        this.internalTestWithFilter(false, true);
    }

    @TestTemplate
    public void testWithFilterLargeMessages() throws Exception {
        this.internalTestWithFilter(true, false);
    }

    @TestTemplate
    public void testWithFilterLargeMessagesFiles() throws Exception {
        this.internalTestWithFilter(true, true);
    }

    public void internalTestWithFilter(boolean largeMessage, boolean useFiles) throws Exception {
        ClientMessage message;
        int i;
        int numMessages = 10;
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, useFiles, server0Params);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, useFiles, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        connectors.put(server1tc.getName(), server1tc);
        this.server0.getConfiguration().setConnectorConfigurations(connectors);
        String filterString = "animal='goat'";
        ArrayList<String> staticConnectors = new ArrayList<String>();
        staticConnectors.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setFilterString("animal='goat'").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        this.server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        this.server0.getConfiguration().setQueueConfigs(queueConfigs0);
        QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
        ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
        queueConfigs1.add(queueConfig1);
        this.server1.getConfiguration().setQueueConfigs(queueConfigs1);
        this.server1.start();
        this.server0.start();
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.locator.createSessionFactory(server0tc);
        ClientSessionFactory sf1 = this.locator.createSessionFactory(server1tc);
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientSession session1 = sf1.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        ClientConsumer consumer1 = session1.createConsumer("queue1");
        session1.start();
        SimpleString propKey = SimpleString.of((String)"testkey");
        SimpleString selectorKey = SimpleString.of((String)"animal");
        for (i = 0; i < 10; ++i) {
            message = session0.createMessage(true);
            message.putIntProperty(propKey, i);
            message.putStringProperty(selectorKey, SimpleString.of((String)"monkey"));
            if (largeMessage) {
                message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
            }
            producer0.send((Message)message);
        }
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        for (i = 0; i < 10; ++i) {
            message = session0.createMessage(true);
            message.putIntProperty(propKey, i);
            message.putStringProperty(selectorKey, SimpleString.of((String)"goat"));
            if (largeMessage) {
                message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
            }
            producer0.send((Message)message);
        }
        for (i = 0; i < 10; ++i) {
            message = consumer1.receive(4000L);
            Assertions.assertNotNull((Object)message);
            Assertions.assertEquals((Object)"goat", (Object)message.getStringProperty(selectorKey));
            Assertions.assertEquals((Object)i, (Object)message.getObjectProperty(propKey));
            message.acknowledge();
            if (!largeMessage) continue;
            this.readLargeMessages(message, 10);
        }
        session0.commit();
        session1.commit();
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        session0.close();
        session1.close();
        sf0.close();
        sf1.close();
        this.closeFields();
        if (useFiles) {
            Map<Long, AtomicInteger> counters = this.loadQueues(this.server0);
            Assertions.assertEquals((int)1, (int)counters.size());
            Long key = counters.keySet().iterator().next();
            AtomicInteger value = counters.get(key);
            Assertions.assertNotNull((Object)value);
            Assertions.assertEquals((int)10, (int)counters.get(key).intValue());
        }
    }

    @TestTemplate
    public void testStartLater() throws Exception {
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, server0Params);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "forwardAddress";
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        connectors.put(server1tc.getName(), server1tc);
        this.server0.getConfiguration().setConnectorConfigurations(connectors);
        ArrayList<String> staticConnectors = new ArrayList<String>();
        staticConnectors.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(100L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        this.server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        this.server0.getConfiguration().setQueueConfigs(queueConfigs0);
        this.server0.start();
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.locator.createSessionFactory(server0tc);
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        int numMessages = 100;
        SimpleString propKey = SimpleString.of((String)"testkey");
        SimpleString selectorKey = SimpleString.of((String)"animal");
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = session0.createMessage(true);
            message.getBodyBuffer().writeBytes(new byte[1024]);
            message.putIntProperty(propKey, i);
            message.putStringProperty(selectorKey, SimpleString.of((String)("monkey" + i)));
            producer0.send((Message)message);
        }
        this.server1.start();
        Wait.assertTrue(() -> ((ActiveMQServer)this.server1).isActive());
        ClientSessionFactory sf1 = this.locator.createSessionFactory(server1tc);
        ClientSession session1 = sf1.createSession(false, true, true);
        try {
            session1.createQueue(QueueConfiguration.of((String)"forwardAddress").setAddress("forwardAddress").setRoutingType(RoutingType.ANYCAST));
        }
        catch (Throwable ignored) {
            ignored.printStackTrace();
        }
        ClientConsumer consumer1 = session1.createConsumer("forwardAddress");
        session1.start();
        for (int i = 0; i < 100; ++i) {
            ClientMessage message = consumer1.receive(5000L);
            Assertions.assertNotNull((Object)message);
            message.acknowledge();
        }
        session1.commit();
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        consumer1.close();
        session1.deleteQueue("forwardAddress");
        session1.close();
        sf1.close();
        this.server1.stop();
        session0.close();
        sf0.close();
        this.closeFields();
        Assertions.assertEquals((int)0, (int)this.loadQueues(this.server0).size());
    }

    @TestTemplate
    public void testWithDuplicates() throws Exception {
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, server0Params);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String secondQueue = "queue1";
        String forwardAddress = "forwardAddress";
        String queueName1 = "forwardQueue";
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        connectors.put(server1tc.getName(), server1tc);
        this.server0.getConfiguration().setConnectorConfigurations(connectors);
        ArrayList<String> staticConnectors = new ArrayList<String>();
        staticConnectors.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(100L).setReconnectAttemptsOnSameNode(-1).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        this.server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        queueConfig0 = QueueConfiguration.of((String)"queue1").setAddress("testAddress");
        queueConfigs0.add(queueConfig0);
        this.server0.getConfiguration().setQueueConfigs(queueConfigs0);
        this.server0.start();
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.locator.createSessionFactory(server0tc);
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        int numMessages = 1000;
        SimpleString propKey = SimpleString.of((String)"testkey");
        SimpleString selectorKey = SimpleString.of((String)"animal");
        for (int i = 0; i < 1000; ++i) {
            ClientMessage message = session0.createMessage(true);
            message.getBodyBuffer().writeBytes(new byte[1024]);
            message.putIntProperty(propKey, i);
            message.putStringProperty(selectorKey, SimpleString.of((String)("monkey" + i)));
            producer0.send((Message)message);
        }
        this.server1.start();
        long[] ids = new long[100];
        Queue queue = this.server0.locateQueue(SimpleString.of((String)"queue0"));
        LinkedListIterator iterator = queue.iterator();
        for (int i = 0; i < 100; ++i) {
            Assertions.assertTrue((boolean)iterator.hasNext());
            ids[i] = ((MessageReference)iterator.next()).getMessage().getMessageID();
        }
        iterator.close();
        DuplicateIDCache duplicateTargetCache = this.server1.getPostOffice().getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat("forwardAddress"));
        TransactionImpl tx = new TransactionImpl(this.server1.getStorageManager());
        for (long id : ids) {
            byte[] duplicateArray = BridgeImpl.getDuplicateBytes((UUID)this.server0.getNodeManager().getUUID(), (long)id);
            duplicateTargetCache.addToCache(duplicateArray, (Transaction)tx);
        }
        tx.commit();
        ClientSessionFactory sf1 = this.locator.createSessionFactory(server1tc);
        ClientSession session1 = sf1.createSession(false, true, true);
        try {
            session1.createQueue(QueueConfiguration.of((String)"forwardQueue").setAddress("forwardAddress").setRoutingType(RoutingType.ANYCAST));
        }
        catch (Throwable ignored) {
            ignored.printStackTrace();
        }
        ClientConsumer consumer1 = session1.createConsumer("forwardQueue");
        session1.start();
        for (int i = 100; i < 1000; ++i) {
            ClientMessage message = consumer1.receive(5000L);
            Assertions.assertNotNull((Object)message);
            Assertions.assertEquals((int)i, (int)message.getIntProperty(propKey));
            message.acknowledge();
        }
        session1.commit();
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        ClientConsumer otherConsumer = session0.createConsumer("queue1");
        session0.start();
        for (int i = 0; i < 1000; ++i) {
            ClientMessage message = otherConsumer.receive(5000L);
            Assertions.assertNotNull((Object)message);
            Assertions.assertEquals((int)2, (int)message.getPropertyNames().size());
            Assertions.assertEquals((int)i, (int)message.getIntProperty(propKey));
            Assertions.assertEquals((Object)SimpleString.of((String)("monkey" + i)), (Object)message.getSimpleStringProperty(selectorKey));
            message.acknowledge();
        }
        consumer1.close();
        session1.deleteQueue("forwardQueue");
        session1.close();
        sf1.close();
        SimpleString queueName1Str = SimpleString.of((String)"forwardQueue");
        Wait.assertTrue(() -> this.server1.locateQueue(queueName1Str) == null);
        this.server1.stop();
        session0.close();
        sf0.close();
        this.closeFields();
        Wait.assertEquals((int)0, () -> this.loadQueues(this.server0).size());
    }

    private void closeFields() throws Exception {
        this.locator.close();
        this.server0.stop();
        this.server1.stop();
    }

    @TestTemplate
    public void testWithTransformer() throws Exception {
        this.internaltestWithTransformer(false);
    }

    @TestTemplate
    public void testWithTransformerFiles() throws Exception {
        this.internaltestWithTransformer(true);
    }

    private void internaltestWithTransformer(boolean useFiles) throws Exception {
        ClientMessage message;
        int i;
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, false, server0Params);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, false, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        connectors.put(server1tc.getName(), server1tc);
        this.server0.getConfiguration().setConnectorConfigurations(connectors);
        ArrayList<String> staticConnectors = new ArrayList<String>();
        staticConnectors.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setTransformerConfiguration(new TransformerConfiguration(SimpleTransformer.class.getName())).setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        this.server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        this.server0.getConfiguration().setQueueConfigs(queueConfigs0);
        QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
        ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
        queueConfigs1.add(queueConfig1);
        this.server1.getConfiguration().setQueueConfigs(queueConfigs1);
        this.server1.start();
        this.server0.start();
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.locator.createSessionFactory(server0tc);
        ClientSessionFactory sf1 = this.locator.createSessionFactory(server1tc);
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientSession session1 = sf1.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        ClientConsumer consumer1 = session1.createConsumer("queue1");
        session1.start();
        int numMessages = 10;
        SimpleString propKey = SimpleString.of((String)"wibble");
        for (i = 0; i < 10; ++i) {
            message = session0.createMessage(true);
            message.putStringProperty(propKey, SimpleString.of((String)"bing"));
            message.getBodyBuffer().writeString("doo be doo be doo be doo");
            producer0.send((Message)message);
        }
        for (i = 0; i < 10; ++i) {
            message = consumer1.receive(200L);
            Assertions.assertNotNull((Object)message);
            SimpleString val = (SimpleString)message.getObjectProperty(propKey);
            Assertions.assertEquals((Object)SimpleString.of((String)"bong"), (Object)val);
            String sval = message.getBodyBuffer().readString();
            Assertions.assertEquals((Object)"dee be dee be dee be dee", (Object)sval);
            message.acknowledge();
        }
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        session0.close();
        session1.close();
        sf0.close();
        sf1.close();
        if (this.server0.getConfiguration().isPersistenceEnabled()) {
            Assertions.assertEquals((int)0, (int)this.loadQueues(this.server0).size());
        }
    }

    @TestTemplate
    public void testWithTransformerProperties() throws Exception {
        ClientMessage message;
        int i;
        String propKey = "bridged";
        String propValue = "true";
        TransformerConfiguration transformerConfiguration = new TransformerConfiguration(AddHeadersTransformer.class.getName());
        transformerConfiguration.getProperties().put("bridged", "true");
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, false, server0Params);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, false, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        connectors.put(server1tc.getName(), server1tc);
        this.server0.getConfiguration().setConnectorConfigurations(connectors);
        ArrayList<String> staticConnectors = new ArrayList<String>();
        staticConnectors.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setTransformerConfiguration(transformerConfiguration).setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        this.server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        this.server0.getConfiguration().setQueueConfigs(queueConfigs0);
        QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
        ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
        queueConfigs1.add(queueConfig1);
        this.server1.getConfiguration().setQueueConfigs(queueConfigs1);
        this.server1.start();
        this.server0.start();
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.locator.createSessionFactory(server0tc);
        ClientSessionFactory sf1 = this.locator.createSessionFactory(server1tc);
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientSession session1 = sf1.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        ClientConsumer consumer1 = session1.createConsumer("queue1");
        session1.start();
        int numMessages = 10;
        for (i = 0; i < 10; ++i) {
            message = session0.createMessage(true);
            message.getBodyBuffer().writeString("doo be doo be doo be doo");
            producer0.send((Message)message);
        }
        for (i = 0; i < 10; ++i) {
            message = consumer1.receive(200L);
            Assertions.assertNotNull((Object)message);
            String messagePropVal = message.getStringProperty("bridged");
            Assertions.assertEquals((Object)"true", (Object)messagePropVal);
            String sval = message.getBodyBuffer().readString();
            Assertions.assertEquals((Object)"doo be doo be doo be doo", (Object)sval);
            message.acknowledge();
        }
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        session0.close();
        session1.close();
        sf0.close();
        sf1.close();
        if (this.server0.getConfiguration().isPersistenceEnabled()) {
            Assertions.assertEquals((int)0, (int)this.loadQueues(this.server0).size());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testSawtoothLoad() throws Exception {
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        ActiveMQServer server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, server0Params);
        server0.getConfiguration().setThreadPoolMaxSize(10);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        ActiveMQServer server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, server1Params);
        server1.getConfiguration().setThreadPoolMaxSize(10);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        final TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        final TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        connectors.put(server1tc.getName(), server1tc);
        server0.getConfiguration().setConnectorConfigurations(connectors);
        ArrayList<String> staticConnectors = new ArrayList<String>();
        staticConnectors.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors).setProducerWindowSize(1);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        server0.getConfiguration().setQueueConfigs(queueConfigs0);
        QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
        ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
        queueConfigs1.add(queueConfig1);
        server1.getConfiguration().setQueueConfigs(queueConfigs1);
        try {
            server1.start();
            server0.start();
            int numMessages = 300;
            int totalrepeats = 3;
            final AtomicInteger errors = new AtomicInteger(0);
            final Semaphore semop = new Semaphore(10000);
            for (int repeat = 0; repeat < 3; ++repeat) {
                ArrayList<Thread> threads = new ArrayList<Thread>();
                class ConsumerThread
                extends Thread {
                    ConsumerThread() {
                    }

                    @Override
                    public void run() {
                        try {
                            ServerLocator locator = BridgeTest.this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server1tc}));
                            ClientSessionFactory sf = BridgeTest.this.createSessionFactory(locator);
                            ClientSession session = sf.createSession(false, false);
                            session.start();
                            ClientConsumer consumer = session.createConsumer("queue1");
                            for (int i = 0; i < 300; ++i) {
                                ClientMessage message = consumer.receive(5000L);
                                Assertions.assertNotNull((Object)message);
                                message.acknowledge();
                                semop.release();
                                if (i % 1000 != 0) continue;
                                session.commit();
                            }
                            session.commit();
                            session.close();
                            sf.close();
                            locator.close();
                        }
                        catch (Throwable e) {
                            e.printStackTrace();
                            errors.incrementAndGet();
                        }
                    }
                }
                threads.add(new ConsumerThread());
                class ProducerThread
                extends Thread {
                    final int nmsg;

                    ProducerThread(int nmsg) {
                        this.nmsg = nmsg;
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        ServerLocator locator = BridgeTest.this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc}));
                        locator.setBlockOnDurableSend(false).setBlockOnNonDurableSend(false);
                        ClientSessionFactory sf = null;
                        ClientSession session = null;
                        ClientProducer producer = null;
                        try {
                            sf = BridgeTest.this.createSessionFactory(locator);
                            session = sf.createSession(false, true, true);
                            producer = session.createProducer(SimpleString.of((String)"testAddress"));
                            for (int i = 0; i < this.nmsg; ++i) {
                                Assertions.assertEquals((int)0, (int)errors.get());
                                ClientMessage message = session.createMessage(true);
                                message.putIntProperty("seq", i);
                                if (i % 100 == 0) {
                                    message.setPriority((byte)(RandomUtil.randomPositiveInt() % 9));
                                } else {
                                    message.setPriority((byte)5);
                                }
                                message.getBodyBuffer().writeBytes(new byte[50]);
                                producer.send((Message)message);
                                Assertions.assertTrue((boolean)semop.tryAcquire(1, 10L, TimeUnit.SECONDS));
                            }
                        }
                        catch (Throwable e) {
                            e.printStackTrace(System.out);
                            errors.incrementAndGet();
                        }
                        finally {
                            try {
                                session.close();
                                sf.close();
                                locator.close();
                            }
                            catch (Exception ignored) {
                                errors.incrementAndGet();
                            }
                        }
                    }
                }
                threads.add(new ProducerThread(150));
                threads.add(new ProducerThread(150));
                for (Thread t : threads) {
                    t.start();
                }
                for (Thread t : threads) {
                    t.join();
                }
                Assertions.assertEquals((int)0, (int)errors.get());
            }
        }
        finally {
            try {
                server0.stop();
            }
            catch (Exception exception) {}
            try {
                server1.stop();
            }
            catch (Exception exception) {}
        }
        Assertions.assertEquals((int)0, (int)this.loadQueues(server0).size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testBridgeWithPaging() throws Exception {
        ActiveMQServer server0 = null;
        ActiveMQServer server1 = null;
        int PAGE_MAX = 10240;
        int PAGE_SIZE = 1024;
        try {
            ClientMessage message;
            HashMap<String, Object> server0Params = new HashMap<String, Object>();
            server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, 1024, 10240, server0Params);
            HashMap<String, Object> server1Params = new HashMap<String, Object>();
            this.addTargetParameters(server1Params);
            server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, 1024, -1, server1Params);
            server1.getConfiguration().setJournalBufferTimeout_AIO(10).setJournalBufferTimeout_NIO(10);
            String testAddress = "testAddress";
            String queueName0 = "queue0";
            String forwardAddress = "forwardAddress";
            String queueName1 = "queue1";
            HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
            TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
            TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
            connectors.put(server1tc.getName(), server1tc);
            server0.getConfiguration().setConnectorConfigurations(connectors);
            server0.getConfiguration().setIDCacheSize(20000).setJournalBufferTimeout_NIO(10).setJournalBufferTimeout_AIO(10);
            ArrayList<String> staticConnectors = new ArrayList<String>();
            staticConnectors.add(server1tc.getName());
            BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(true).setConfirmationWindowSize(1).setStaticConnectors(staticConnectors);
            bridgeConfiguration.setCallTimeout(1000L);
            ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
            bridgeConfigs.add(bridgeConfiguration);
            server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
            QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
            ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
            queueConfigs0.add(queueConfig0);
            server0.getConfiguration().setQueueConfigs(queueConfigs0);
            QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
            ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
            queueConfigs1.add(queueConfig1);
            server1.getConfiguration().setQueueConfigs(queueConfigs1);
            ArrayList<String> interceptorToStop = new ArrayList<String>();
            interceptorToStop.add(StopInterceptor.class.getName());
            server1.getConfiguration().setIncomingInterceptorClassNames(interceptorToStop);
            StopInterceptor.serverToStop = server0;
            server1.start();
            server0.start();
            this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
            ClientSessionFactory sf0 = this.locator.createSessionFactory(server0tc);
            ClientSessionFactory sf1 = this.locator.createSessionFactory(server1tc);
            ClientSession session0 = sf0.createSession(false, false, true);
            ClientSession session1 = sf1.createSession(false, true, true);
            ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
            ClientConsumer consumer1 = session1.createConsumer("queue1");
            session1.start();
            int numMessages = 200;
            SimpleString propKey = SimpleString.of((String)"testkey");
            for (int i = 0; i < 200; ++i) {
                ClientMessage message2 = session0.createMessage(true);
                message2.getBodyBuffer().writeBytes(new byte[512]);
                message2.putIntProperty(propKey, i);
                producer0.send((Message)message2);
            }
            session0.commit();
            Assertions.assertTrue((boolean)StopInterceptor.latch.await(1L, TimeUnit.HOURS));
            StopInterceptor.thread.join(15000L);
            if (StopInterceptor.thread.isAlive()) {
                System.out.println(BridgeTest.threadDump("Still alive, stop didn't work!!!"));
                Assertions.fail((String)"Thread that should restart the server still alive");
            }
            server0.start();
            HashMap<Integer, AtomicInteger> receivedMsg = new HashMap<Integer, AtomicInteger>();
            for (int i = 0; i < 200 && (message = consumer1.receive(5000L)) != null; ++i) {
                Integer msgKey = message.getIntProperty(propKey);
                AtomicInteger msgCount = (AtomicInteger)receivedMsg.get(msgKey);
                if (msgKey != i) {
                    System.err.println("Message " + msgCount + " received out of order, expected to be " + i + " it's acceptable but not the ideal!");
                }
                if (msgCount == null) {
                    msgCount = new AtomicInteger();
                    receivedMsg.put(msgKey, msgCount);
                }
                msgCount.incrementAndGet();
                if (i % 500 != 0) continue;
                logger.debug("received {}", (Object)i);
            }
            boolean failed = false;
            if (consumer1.receiveImmediate() != null) {
                System.err.println("Unexpected message received");
                failed = true;
            }
            for (int i = 0; i < 200; ++i) {
                AtomicInteger msgCount = (AtomicInteger)receivedMsg.get(i);
                if (msgCount == null) {
                    System.err.println("Msg " + i + " wasn't received");
                    failed = true;
                    continue;
                }
                if (msgCount.get() <= 1) continue;
                System.err.println("msg " + i + " was received " + msgCount.get() + " times");
                failed = true;
            }
            Assertions.assertFalse((boolean)failed, (String)"Test failed");
            session0.close();
            session1.close();
            sf0.close();
            sf1.close();
        }
        finally {
            if (this.locator != null) {
                this.locator.close();
            }
            try {
                server0.stop();
            }
            catch (Throwable throwable) {}
            try {
                server1.stop();
            }
            catch (Throwable throwable) {}
        }
        Assertions.assertEquals((int)0, (int)this.loadQueues(server0).size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testBridgeWithLargeMessage() throws Exception {
        ActiveMQServer server0 = null;
        ActiveMQServer server1 = null;
        int PAGE_MAX = 0x100000;
        int PAGE_SIZE = 10240;
        ServerLocator locator = null;
        try {
            ClientMessage message;
            int i;
            HashMap<String, Object> server0Params = new HashMap<String, Object>();
            server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, 10240, 0x100000, server0Params);
            HashMap<String, Object> server1Params = new HashMap<String, Object>();
            this.addTargetParameters(server1Params);
            server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, server1Params);
            String testAddress = "testAddress";
            String queueName0 = "queue0";
            String forwardAddress = "forwardAddress";
            String queueName1 = "queue1";
            HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
            TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
            TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
            connectors.put(server1tc.getName(), server1tc);
            server0.getConfiguration().setConnectorConfigurations(connectors);
            ArrayList<String> staticConnectors = new ArrayList<String>();
            staticConnectors.add(server1tc.getName());
            BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
            ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
            bridgeConfigs.add(bridgeConfiguration);
            server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
            QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
            ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
            queueConfigs0.add(queueConfig0);
            server0.getConfiguration().setQueueConfigs(queueConfigs0);
            QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
            ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
            queueConfigs1.add(queueConfig1);
            server1.getConfiguration().setQueueConfigs(queueConfigs1);
            server1.start();
            server0.start();
            locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
            ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
            ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
            ClientSession session0 = sf0.createSession(false, true, true);
            ClientSession session1 = sf1.createSession(false, true, true);
            ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
            ClientConsumer consumer1 = session1.createConsumer("queue1");
            session1.start();
            int numMessages = 50;
            SimpleString propKey = SimpleString.of((String)"testkey");
            int LARGE_MESSAGE_SIZE = 1024;
            for (i = 0; i < 50; ++i) {
                message = session0.createMessage(true);
                message.setBodyInputStream(BridgeTest.createFakeLargeStream(1024L));
                message.putIntProperty(propKey, i);
                producer0.send((Message)message);
            }
            session0.commit();
            for (i = 0; i < 50; ++i) {
                message = consumer1.receive(5000L);
                Assertions.assertNotNull((Object)message);
                Assertions.assertEquals((Object)i, (Object)message.getObjectProperty(propKey));
                ActiveMQBuffer buff = message.getBodyBuffer();
                for (int posMsg = 0; posMsg < 1024; ++posMsg) {
                    Assertions.assertEquals((byte)BridgeTest.getSamplebyte(posMsg), (byte)buff.readByte());
                }
                message.acknowledge();
            }
            session1.commit();
            Assertions.assertNull((Object)consumer1.receiveImmediate());
            session0.close();
            session1.close();
            sf0.close();
            sf1.close();
        }
        finally {
            if (locator != null) {
                locator.close();
            }
            try {
                server0.stop();
            }
            catch (Throwable throwable) {}
            try {
                server1.stop();
            }
            catch (Throwable throwable) {}
        }
        Assertions.assertEquals((int)0, (int)this.loadQueues(server0).size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testBridgeWithVeryLargeMessage() throws Exception {
        ActiveMQServer server0 = null;
        ActiveMQServer server1 = null;
        int PAGE_MAX = 0x100000;
        int PAGE_SIZE = 10240;
        ServerLocator locator = null;
        try {
            HashMap<String, Object> server0Params = new HashMap<String, Object>();
            server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, 10240, 0x100000, server0Params);
            HashMap<String, Object> server1Params = new HashMap<String, Object>();
            this.addTargetParameters(server1Params);
            server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, server1Params);
            String testAddress = "testAddress";
            String queueName0 = "queue0";
            String forwardAddress = "forwardAddress";
            String queueName1 = "queue1";
            HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
            TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
            TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
            connectors.put(server1tc.getName(), server1tc);
            server0.getConfiguration().setConnectorConfigurations(connectors);
            ArrayList<String> staticConnectors = new ArrayList<String>();
            staticConnectors.add(server1tc.getName());
            int minLargeMessageSize = 204800;
            BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors).setMinLargeMessageSize(minLargeMessageSize).setProducerWindowSize(minLargeMessageSize / 2);
            ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
            bridgeConfigs.add(bridgeConfiguration);
            server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
            QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
            ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
            queueConfigs0.add(queueConfig0);
            server0.getConfiguration().setQueueConfigs(queueConfigs0);
            QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("forwardAddress");
            ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
            queueConfigs1.add(queueConfig1);
            server1.getConfiguration().setQueueConfigs(queueConfigs1);
            server1.start();
            server0.start();
            locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
            ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
            ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
            ClientSession session0 = sf0.createSession(false, true, true);
            ClientSession session1 = sf1.createSession(false, true, true);
            ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
            ClientConsumer consumer1 = session1.createConsumer("queue1");
            session1.start();
            long largeMessageSize = 204800L;
            ClientMessage largeMessage = this.createLargeMessage(session0, 204800L);
            producer0.send((Message)largeMessage);
            session0.commit();
            Wait.waitFor(() -> session1.queueQuery(SimpleString.of((String)"queue1")).getMessageCount() > 0L);
            ClientMessage message = consumer1.receive(5000L);
            message.acknowledge();
            File outputFile = new File(this.getTemporaryDir(), "huge_message_received.dat");
            logger.debug("-----message save to: {}", (Object)outputFile.getAbsolutePath());
            FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
            BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
            message.setOutputStream((OutputStream)bufferedOutput);
            if (!message.waitOutputStreamCompletion(300000L)) {
                Assertions.fail((String)"message didn't get received to disk in 5 min. Is the machine slow?");
            }
            session1.commit();
            Assertions.assertNull((Object)consumer1.receiveImmediate());
            session0.close();
            session1.close();
            sf0.close();
            sf1.close();
        }
        finally {
            if (locator != null) {
                locator.close();
            }
            try {
                server0.stop();
            }
            catch (Throwable throwable) {}
            try {
                server1.stop();
            }
            catch (Throwable throwable) {}
        }
        Assertions.assertEquals((int)0, (int)this.loadQueues(server0).size());
    }

    private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize) throws Exception {
        File fileInput = new File(this.getTemporaryDir(), "huge_message_to_send.dat");
        BridgeTest.createFile(fileInput, largeMessageSize);
        logger.debug("File created at: {}", (Object)fileInput.getAbsolutePath());
        ClientMessage message = session.createMessage((byte)4, true);
        FileInputStream fileInputStream = new FileInputStream(fileInput);
        BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
        message.setBodyInputStream((InputStream)bufferedInput);
        return message;
    }

    private static void createFile(File file, long fileSize) throws IOException {
        if (file.exists()) {
            logger.warn("---file already there {}", (Object)file.length());
            return;
        }
        FileOutputStream fileOut = new FileOutputStream(file);
        BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
        byte[] outBuffer = new byte[0x100000];
        logger.debug(" --- creating file, size: {}", (Object)fileSize);
        for (long i = 0L; i < fileSize; i += (long)outBuffer.length) {
            buffOut.write(outBuffer);
        }
        buffOut.close();
    }

    @TestTemplate
    public void testNullForwardingAddress() throws Exception {
        ClientMessage message;
        int i;
        HashMap<String, Object> server0Params = new HashMap<String, Object>();
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, false, server0Params);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, false, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String queueName1 = "queue1";
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        connectors.put(server1tc.getName(), server1tc);
        this.server0.getConfiguration().setConnectorConfigurations(connectors);
        int messageSize = 1024;
        int numMessages = 10;
        ArrayList<String> staticConnectors = new ArrayList<String>();
        staticConnectors.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(5120).setStaticConnectors(staticConnectors);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        this.server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = QueueConfiguration.of((String)"queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        this.server0.getConfiguration().setQueueConfigs(queueConfigs0);
        QueueConfiguration queueConfig1 = QueueConfiguration.of((String)"queue1").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
        queueConfigs1.add(queueConfig1);
        this.server1.getConfiguration().setQueueConfigs(queueConfigs1);
        this.server1.start();
        this.server0.start();
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.locator.createSessionFactory(server0tc);
        ClientSessionFactory sf1 = this.locator.createSessionFactory(server1tc);
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientSession session1 = sf1.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        ClientConsumer consumer1 = session1.createConsumer("queue1");
        session1.start();
        byte[] bytes = new byte[1024];
        SimpleString propKey = SimpleString.of((String)"testkey");
        for (i = 0; i < 10; ++i) {
            message = session0.createMessage(true);
            message.putIntProperty(propKey, i);
            message.getBodyBuffer().writeBytes(bytes);
            producer0.send((Message)message);
        }
        for (i = 0; i < 10; ++i) {
            message = consumer1.receive(200L);
            Assertions.assertNotNull((Object)message);
            Assertions.assertEquals((Object)i, (Object)message.getObjectProperty(propKey));
            message.acknowledge();
        }
        Assertions.assertNull((Object)consumer1.receiveImmediate());
        session0.close();
        session1.close();
        sf0.close();
        sf1.close();
        this.closeFields();
    }

    @TestTemplate
    public void testInjectedTransformer() throws Exception {
        SimpleString ADDRESS = SimpleString.of((String)"myAddress");
        SimpleString QUEUE = SimpleString.of((String)"myQueue");
        String BRIDGE = "myBridge";
        ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl();
        Transformer transformer = message -> null;
        serviceRegistry.addBridgeTransformer("myBridge", transformer);
        Configuration config = this.createDefaultInVMConfig().addConnectorConfiguration("in-vm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
        ActiveMQServer server = this.addServer((ActiveMQServer)new ActiveMQServerImpl(config, null, null, null, (ServiceRegistry)serviceRegistry));
        server.start();
        server.waitForActivation(100L, TimeUnit.MILLISECONDS);
        server.createQueue(QueueConfiguration.of((SimpleString)QUEUE).setAddress(ADDRESS).setRoutingType(RoutingType.ANYCAST).setDurable(Boolean.valueOf(false)));
        ArrayList<String> connectors = new ArrayList<String>();
        connectors.add("in-vm");
        server.deployBridge(new BridgeConfiguration().setName("myBridge").setQueueName(QUEUE.toString()).setForwardingAddress(ADDRESS.toString()).setStaticConnectors(connectors));
        Bridge bridge = (Bridge)server.getClusterManager().getBridges().get("myBridge");
        Assertions.assertNotNull((Object)bridge);
        Assertions.assertEquals((Object)transformer, (Object)((BridgeImpl)bridge).getTransformer());
    }

    @TestTemplate
    public void testDefaultConfirmationWindowSize() throws Exception {
        SimpleString ADDRESS = SimpleString.of((String)"myAddress");
        SimpleString QUEUE = SimpleString.of((String)"myQueue");
        SimpleString FORWARDING_ADDRESS = SimpleString.of((String)"myForwardingAddress");
        SimpleString FORWARDING_QUEUE = SimpleString.of((String)"myForwardingQueue");
        String BRIDGE = "myBridge";
        Configuration config = this.createDefaultConfig(0, this.isNetty()).addConnectorConfiguration("myConnector", new TransportConfiguration(this.getConnector()));
        ActiveMQServer server = this.addServer((ActiveMQServer)new ActiveMQServerImpl(config));
        server.start();
        server.waitForActivation(100L, TimeUnit.MILLISECONDS);
        server.createQueue(QueueConfiguration.of((SimpleString)QUEUE).setAddress(ADDRESS).setRoutingType(RoutingType.ANYCAST).setDurable(Boolean.valueOf(false)));
        server.createQueue(QueueConfiguration.of((SimpleString)FORWARDING_QUEUE).setAddress(FORWARDING_ADDRESS).setRoutingType(RoutingType.ANYCAST).setDurable(Boolean.valueOf(false)));
        ArrayList<String> connectors = new ArrayList<String>();
        connectors.add("myConnector");
        server.deployBridge(new BridgeConfiguration().setName("myBridge").setQueueName(QUEUE.toString()).setForwardingAddress(FORWARDING_ADDRESS.toString()).setStaticConnectors(connectors));
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{new TransportConfiguration(this.getConnector())}));
        ClientSessionFactory sf = this.addSessionFactory(this.locator.createSessionFactory());
        ClientSession session = this.addClientSession(sf.createSession(false, true, true));
        ClientProducer producer = this.addClientProducer(session.createProducer(ADDRESS));
        ClientConsumer consumer = this.addClientConsumer(session.createConsumer(FORWARDING_QUEUE));
        session.start();
        producer.send((Message)session.createMessage(true));
        Assertions.assertNotNull((Object)consumer.receive(200L));
    }

    @TestTemplate
    public void testManagementLeak() throws Exception {
        SimpleString ADDRESS = SimpleString.of((String)"myAddress");
        SimpleString QUEUE = SimpleString.of((String)"myQueue");
        SimpleString FORWARDING_ADDRESS = SimpleString.of((String)"myForwardingAddress");
        SimpleString FORWARDING_QUEUE = SimpleString.of((String)"myForwardingQueue");
        String BRIDGE = "myBridge";
        ActiveMQServer server = this.addServer((ActiveMQServer)new ActiveMQServerImpl(this.createDefaultConfig(0, this.isNetty()).addConnectorConfiguration("myConnector", new TransportConfiguration(this.getConnector()))));
        server.start();
        server.waitForActivation(100L, TimeUnit.MILLISECONDS);
        server.createQueue(QueueConfiguration.of((SimpleString)QUEUE).setAddress(ADDRESS).setRoutingType(RoutingType.ANYCAST).setDurable(Boolean.valueOf(false)));
        server.createQueue(QueueConfiguration.of((SimpleString)FORWARDING_QUEUE).setAddress(FORWARDING_ADDRESS).setRoutingType(RoutingType.ANYCAST).setDurable(Boolean.valueOf(false)));
        ArrayList<String> connectors = new ArrayList<String>();
        connectors.add("myConnector");
        int concurrency = 20;
        BridgeConfiguration config = new BridgeConfiguration().setName("myBridge").setQueueName(QUEUE.toString()).setForwardingAddress(FORWARDING_ADDRESS.toString()).setStaticConnectors(connectors).setConcurrency(20);
        server.deployBridge(config);
        Assertions.assertEquals((int)20, (int)server.getManagementService().getResources(BridgeControl.class).length);
        server.destroyBridge(config.getName());
        Assertions.assertEquals((int)0, (int)server.getManagementService().getResources(BridgeControl.class).length);
    }

    @TestTemplate
    public void testPendingAcksNeverArriveOnStop() throws Exception {
        this.testPendingAcksNeverArrive(true, false);
    }

    @TestTemplate
    public void testPendingAcksNeverArriveOnPause() throws Exception {
        this.testPendingAcksNeverArrive(false, false);
    }

    @TestTemplate
    public void testPendingAcksNeverArriveOnStopWithLargeMessages() throws Exception {
        this.testPendingAcksNeverArrive(true, true);
    }

    @TestTemplate
    public void testPendingAcksNeverArriveOnPauseWithLargeMessages() throws Exception {
        this.testPendingAcksNeverArrive(false, true);
    }

    private void testPendingAcksNeverArrive(boolean stop, boolean large) throws Exception {
        BridgeImpl.State desiredState;
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, null);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        long pendingAckTimeout = 2000L;
        int messageSize = 1024;
        int numMessages = 10;
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), null);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        this.server0.getConfiguration().setConnectorConfigurations(Map.of(server1tc.getName(), server1tc)).setBridgeConfigurations(Arrays.asList(new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(5120).setMinLargeMessageSize(large ? 512 : 2048).setPendingAckTimeout(2000L).setStaticConnectors(Arrays.asList(server1tc.getName()))));
        this.server0.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of((String)"queue0").setAddress("testAddress")));
        this.server0.start();
        Interceptor sendBlockingInterceptor = (packet, connection) -> packet.getType() != 71 && packet.getType() != 72;
        this.server1.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of((String)"queue1").setAddress("forwardAddress")));
        this.server1.start();
        this.server1.getRemotingService().addIncomingInterceptor((BaseInterceptor)sendBlockingInterceptor);
        Bridge bridge = (Bridge)this.server0.getClusterManager().getBridges().get("bridge1");
        Wait.assertTrue(() -> bridge.isConnected(), (long)2000L, (long)100L);
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.addSessionFactory(this.locator.createSessionFactory(server0tc));
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        byte[] bytes = new byte[1024];
        SimpleString propKey = SimpleString.of((String)"testkey");
        for (int i = 0; i < 10; ++i) {
            ClientMessage message = session0.createMessage(true);
            message.putIntProperty(propKey, i);
            message.getBodyBuffer().writeBytes(bytes);
            producer0.send((Message)message);
        }
        session0.close();
        sf0.close();
        Wait.assertEquals((Long)10L, () -> bridge.getMetrics().getMessagesPendingAcknowledgement(), (long)2000L, (long)100L);
        long start = System.currentTimeMillis();
        if (stop) {
            bridge.stop();
            desiredState = BridgeImpl.State.STOPPED;
        } else {
            bridge.pause();
            desiredState = BridgeImpl.State.PAUSED;
        }
        Wait.assertEquals((Object)desiredState, () -> ((BridgeImpl)bridge).getState(), (long)2000L, (long)25L);
        Assertions.assertTrue((System.currentTimeMillis() - start >= 2000L ? 1 : 0) != 0);
        Wait.assertEquals((Long)10L, () -> this.server0.locateQueue("queue0").getMessageCount(), (long)2000L, (long)100L);
        Wait.assertEquals((Long)0L, () -> this.server0.locateQueue("queue0").getDeliveringCount(), (long)2000L, (long)100L);
    }

    @TestTemplate
    public void testPendingAcksEventuallyArriveOnStop() throws Exception {
        this.testPendingAcksEventuallyArrive(true, false);
    }

    @TestTemplate
    public void testPendingAcksEventuallyArriveOnPause() throws Exception {
        this.testPendingAcksEventuallyArrive(false, false);
    }

    @TestTemplate
    public void testPendingAcksEventuallyArriveOnStopWithLargeMessages() throws Exception {
        this.testPendingAcksEventuallyArrive(true, true);
    }

    @TestTemplate
    public void testPendingAcksEventuallyArriveOnPauseWithLargeMessages() throws Exception {
        this.testPendingAcksEventuallyArrive(false, true);
    }

    private void testPendingAcksEventuallyArrive(boolean stop, boolean large) throws Exception {
        BridgeImpl.State desiredState;
        this.server0 = this.createClusteredServerWithParams(this.isNetty(), 0, true, null);
        HashMap<String, Object> server1Params = new HashMap<String, Object>();
        this.addTargetParameters(server1Params);
        this.server1 = this.createClusteredServerWithParams(this.isNetty(), 1, true, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        long pendingAckTimeout = 2000L;
        int messageSize = 1024;
        int numMessages = 10;
        TransportConfiguration server0tc = new TransportConfiguration(this.getConnector(), null);
        TransportConfiguration server1tc = new TransportConfiguration(this.getConnector(), server1Params);
        this.server0.getConfiguration().setConnectorConfigurations(Map.of(server1tc.getName(), server1tc)).setBridgeConfigurations(Arrays.asList(new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(5120).setMinLargeMessageSize(large ? 512 : 2048).setPendingAckTimeout(2000L).setStaticConnectors(Arrays.asList(server1tc.getName()))));
        this.server0.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of((String)"queue0").setAddress("testAddress")));
        this.server0.start();
        CountDownLatch opLatch = new CountDownLatch(1);
        Interceptor sendBlockingInterceptor = (packet, connection) -> {
            if (packet.getType() == 71 || packet.getType() == 72) {
                try {
                    opLatch.await(2000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            return true;
        };
        this.server1.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of((String)"queue1").setAddress("forwardAddress")));
        this.server1.start();
        this.server1.getRemotingService().addIncomingInterceptor((BaseInterceptor)sendBlockingInterceptor);
        Bridge bridge = (Bridge)this.server0.getClusterManager().getBridges().get("bridge1");
        Wait.assertTrue(() -> bridge.isConnected(), (long)2000L, (long)100L);
        this.locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.addSessionFactory(this.locator.createSessionFactory(server0tc));
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(SimpleString.of((String)"testAddress"));
        byte[] bytes = new byte[1024];
        SimpleString propKey = SimpleString.of((String)"testkey");
        for (int i = 0; i < 10; ++i) {
            ClientMessage message = session0.createMessage(true);
            message.putIntProperty(propKey, i);
            message.getBodyBuffer().writeBytes(bytes);
            producer0.send((Message)message);
        }
        session0.close();
        sf0.close();
        Wait.assertEquals((Long)10L, () -> bridge.getMetrics().getMessagesPendingAcknowledgement(), (long)2000L, (long)100L);
        Assertions.assertEquals((long)10L, (long)this.server0.locateQueue("queue0").getDeliveringCount());
        if (stop) {
            bridge.stop();
            desiredState = BridgeImpl.State.STOPPED;
        } else {
            bridge.pause();
            desiredState = BridgeImpl.State.PAUSED;
        }
        Thread.sleep(1000L);
        opLatch.countDown();
        Wait.assertEquals((Object)desiredState, () -> ((BridgeImpl)bridge).getState(), (long)2000L, (long)25L);
        Wait.assertEquals((Long)0L, () -> this.server0.locateQueue("queue0").getMessageCount(), (long)2000L, (long)100L);
        Wait.assertEquals((Long)0L, () -> this.server0.locateQueue("queue0").getDeliveringCount(), (long)2000L, (long)100L);
        Wait.assertEquals((Long)10L, () -> this.server1.locateQueue("queue1").getMessageCount(), (long)2000L, (long)100L);
    }

    protected Map<Long, AtomicInteger> loadQueues(ActiveMQServer serverToInvestigate) throws Exception {
        NIOSequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalLocation(), 1);
        JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(), serverToInvestigate.getConfiguration().getJournalMinFiles(), serverToInvestigate.getConfiguration().getJournalPoolFiles(), 0, 0, (SequentialFileFactory)messagesFF, "activemq-data", "amq", 1);
        LinkedList records = new LinkedList();
        LinkedList preparedTransactions = new LinkedList();
        messagesJournal.start();
        messagesJournal.load(records, preparedTransactions, null);
        HashMap<Long, AtomicInteger> messageRefCounts = new HashMap<Long, AtomicInteger>();
        for (RecordInfo info : records) {
            Object o = DescribeJournal.newObjectEncoding((RecordInfo)info);
            if (info.getUserRecordType() != 32) continue;
            DescribeJournal.ReferenceDescribe ref = (DescribeJournal.ReferenceDescribe)o;
            AtomicInteger count = (AtomicInteger)messageRefCounts.get(ref.refEncoding.queueID);
            if (count == null) {
                count = new AtomicInteger(1);
                messageRefCounts.put(ref.refEncoding.queueID, count);
                continue;
            }
            count.incrementAndGet();
        }
        messagesJournal.stop();
        return messageRefCounts;
    }

    public static class StopInterceptor
    implements Interceptor {
        static ActiveMQServer serverToStop;
        static Thread thread;
        static final ReusableLatch latch;
        static int count;

        public static void reset() {
            latch.setCount(1);
            serverToStop = null;
            count = 0;
            thread = null;
        }

        public synchronized boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
            if (packet instanceof SessionSendMessage && ++count == 100) {
                try {
                    thread = new Thread("***Server Restarter***"){

                        @Override
                        public void run() {
                            try {
                                serverToStop.fail(false);
                                latch.countDown();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    };
                    thread.start();
                    latch.await();
                    return true;
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return true;
        }

        static {
            latch = new ReusableLatch(0);
            count = 0;
        }
    }
}

