/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPMirrorConnectionTest
extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int BROKER_PORT_NUM = 5673;

    @Override
    protected ActiveMQServer createServer() throws Exception {
        return this.createServer(5673, false);
    }

    @Override
    protected String getConfiguredProtocols() {
        return "AMQP,CORE";
    }

    @Test
    @Timeout(value=20L)
    public void testBrokerMirrorConnectsWithAnonymous() throws Exception {
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLAnonymousConnect(new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(brokerProperties);
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement());
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testBrokerMirrorConnectsWithPlain() throws Exception {
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLPlainConnect("user", "pass", new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(brokerProperties);
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.setUser("user");
            amqpConnection.setPassword("pass");
            amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement());
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testBrokerHandlesSenderLinkOmitsMirrorCapability() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLAnonymousConnect(new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond();
            peer.expectClose().withError(AmqpSupport.CONNECTION_FORCED.toString()).optional();
            peer.expectConnectionToDrop();
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement());
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testBrokerAddsAddressAndQueue() throws Exception {
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLPlainConnect("user", "pass", new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(brokerProperties);
            peer.remoteFlow().withLinkCredit(10L).queue();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.setUser("user");
            amqpConnection.setPassword("pass");
            amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setQueueCreation(true).setAddressFilter("sometest"));
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false));
            this.server.createQueue(QueueConfiguration.of((String)"sometest").setDurable(Boolean.valueOf(true)));
            peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testCreateDurableConsumerReplicatesAddressAndQueue() throws Exception {
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLPlainConnect("user", "pass", new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(brokerProperties);
            peer.remoteFlow().withLinkCredit(10L).queue();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.setUser("user");
            amqpConnection.setPassword("pass");
            amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setQueueCreation(true));
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
            try (Connection connection = factory.createConnection();){
                connection.setClientID("test-client-id");
                Session session = connection.createSession(false, 2);
                Topic topic = session.createTopic("test-topic");
                MessageConsumer consumer = session.createDurableConsumer(topic, "subscription");
                consumer.close();
            }
            peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testBrokerMirrorHonorsCoreTunnelingEnable() throws Exception {
        this.testBrokerMirrorHonorsCoreTunnelingEnableOrDisable(true);
    }

    @Test
    @Timeout(value=20L)
    public void testBrokerMirrorHonorsCoreTunnelingDisable() throws Exception {
        this.testBrokerMirrorHonorsCoreTunnelingEnableOrDisable(false);
    }

    public void testBrokerMirrorHonorsCoreTunnelingEnableOrDisable(boolean tunneling) throws Exception {
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        String[] capabilities = tunneling ? new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()} : new String[]{"amq.mirror"};
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLPlainConnect("user", "pass", new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(capabilities).respond().withOfferedCapabilities(capabilities).withPropertiesMap(brokerProperties);
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPMirrorBrokerConnectionElement mirrorElement = new AMQPMirrorBrokerConnectionElement();
            mirrorElement.addProperty("tunnel-core-messages", Boolean.toString(tunneling));
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.setUser("user");
            amqpConnection.setPassword("pass");
            amqpConnection.addElement((AMQPBrokerConnectionElement)mirrorElement);
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testProducerMessageIsMirroredWithCoreTunnelingUsesCoreMessageFormat() throws Exception {
        this.doTestProducerMessageIsMirroredWithCorrectMessageFormat(true);
    }

    @Test
    @Timeout(value=20L)
    public void testProducerMessageIsMirroredWithoutCoreTunnelingUsesDefaultMessageFormat() throws Exception {
        this.doTestProducerMessageIsMirroredWithCorrectMessageFormat(false);
    }

    private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tunneling) throws Exception {
        int messageFormat;
        String[] capabilities;
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        if (tunneling) {
            capabilities = new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()};
            messageFormat = 1183580416;
        } else {
            capabilities = new String[]{"amq.mirror"};
            messageFormat = 0;
        }
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLPlainConnect("user", "pass", new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(capabilities).respond().withOfferedCapabilities(capabilities).withPropertiesMap(brokerProperties);
            peer.remoteFlow().withLinkCredit(10L).queue();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.expectTransfer().withMessageFormat(messageFormat).accept();
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPMirrorBrokerConnectionElement mirrorElement = new AMQPMirrorBrokerConnectionElement();
            mirrorElement.addProperty("tunnel-core-messages", Boolean.toString(tunneling));
            mirrorElement.setQueueCreation(true);
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.setUser("user");
            amqpConnection.setPassword("pass");
            amqpConnection.addElement((AMQPBrokerConnectionElement)mirrorElement);
            this.server.createQueue(QueueConfiguration.of((String)"myQueue").setDurable(Boolean.valueOf(true)));
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5673");
            try (Connection connection = factory.createConnection();){
                Session session = connection.createSession(false, 2);
                Queue queue = session.createQueue("myQueue");
                MessageConsumer consumer = session.createConsumer((Destination)queue);
                MessageProducer producer = session.createProducer((Destination)queue);
                TextMessage message = session.createTextMessage("test");
                connection.start();
                producer.setDeliveryMode(2);
                producer.send((Message)message);
                consumer.close();
            }
            peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testRemoteDoesNotOfferTunnelingResultsInDefaultAMQPFormattedMessages() throws Exception {
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLPlainConnect("user", "pass", new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror"}).withPropertiesMap(brokerProperties);
            peer.remoteFlow().withLinkCredit(10L).queue();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.expectTransfer().withMessageFormat(0).accept();
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPMirrorBrokerConnectionElement mirrorElement = new AMQPMirrorBrokerConnectionElement();
            mirrorElement.addProperty("tunnel-core-messages", Boolean.toString(true));
            mirrorElement.setQueueCreation(true);
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.setUser("user");
            amqpConnection.setPassword("pass");
            amqpConnection.addElement((AMQPBrokerConnectionElement)mirrorElement);
            this.server.createQueue(QueueConfiguration.of((String)"myQueue").setDurable(Boolean.valueOf(true)));
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5673");
            try (Connection connection = factory.createConnection();){
                Session session = connection.createSession(false, 2);
                Queue queue = session.createQueue("myQueue");
                MessageConsumer consumer = session.createConsumer((Destination)queue);
                MessageProducer producer = session.createProducer((Destination)queue);
                TextMessage message = session.createTextMessage("test");
                connection.start();
                producer.setDeliveryMode(2);
                producer.send((Message)message);
                consumer.close();
            }
            peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testTunnelingDisabledButRemoteOffersDoesNotUseTunneling() throws Exception {
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLPlainConnect("user", "pass", new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror"}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(brokerProperties);
            peer.remoteFlow().withLinkCredit(10L).queue();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.expectTransfer().withMessageFormat(0).accept();
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPMirrorBrokerConnectionElement mirrorElement = new AMQPMirrorBrokerConnectionElement();
            mirrorElement.addProperty("tunnel-core-messages", Boolean.toString(false));
            mirrorElement.setQueueCreation(true);
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.setUser("user");
            amqpConnection.setPassword("pass");
            amqpConnection.addElement((AMQPBrokerConnectionElement)mirrorElement);
            this.server.createQueue(QueueConfiguration.of((String)"myQueue").setDurable(Boolean.valueOf(true)));
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5673");
            try (Connection connection = factory.createConnection();){
                Session session = connection.createSession(false, 2);
                Queue queue = session.createQueue("myQueue");
                MessageConsumer consumer = session.createConsumer((Destination)queue);
                MessageProducer producer = session.createProducer((Destination)queue);
                TextMessage message = session.createTextMessage("test");
                connection.start();
                producer.setDeliveryMode(2);
                producer.send((Message)message);
                consumer.close();
            }
            peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testMirrorConnectionRemainsUnchangedAfterConfigurationUpdate() throws Exception {
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLPlainConnect("user", "pass", new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(brokerProperties);
            peer.remoteFlow().withLinkCredit(10L).queue();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPMirrorBrokerConnectionElement mirror = new AMQPMirrorBrokerConnectionElement();
            mirror.setQueueCreation(true);
            mirror.setDurable(true);
            mirror.setName("test");
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.setUser("user");
            amqpConnection.setPassword("pass");
            amqpConnection.addElement((AMQPBrokerConnectionElement)mirror);
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
            try (Connection connection = factory.createConnection();){
                Session session = connection.createSession(false, 2);
                Queue queue = session.createQueue("myQueue");
                MessageConsumer consumer = session.createConsumer((Destination)queue);
                MessageProducer producer = session.createProducer((Destination)queue);
                TextMessage message = session.createTextMessage("test");
                peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                AMQPMirrorBrokerConnectionElement mirrorUpdated = new AMQPMirrorBrokerConnectionElement();
                mirrorUpdated.setQueueCreation(true);
                mirrorUpdated.setDurable(false);
                mirrorUpdated.setName("test");
                AMQPBrokerConnectConfiguration amqpConnectionUpdated = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
                amqpConnectionUpdated.setReconnectAttempts(0);
                amqpConnectionUpdated.setUser("user1");
                amqpConnectionUpdated.setPassword("pass1");
                amqpConnectionUpdated.addElement((AMQPBrokerConnectionElement)mirrorUpdated);
                ProtonProtocolManagerFactory protocolFactory = (ProtonProtocolManagerFactory)this.server.getRemotingService().getProtocolFactoryMap().get("AMQP");
                Assertions.assertNotNull((Object)protocolFactory);
                this.server.getConfiguration().clearAMQPConnectionConfigurations();
                this.server.getConfiguration().addAMQPConnection(amqpConnectionUpdated);
                protocolFactory.updateProtocolServices(this.server, Collections.emptyList());
                peer.waitForScriptToComplete();
                peer.expectTransfer().withMessageFormat(0).accept();
                producer.setDeliveryMode(2);
                producer.send((Message)message);
                consumer.close();
                peer.waitForScriptToComplete();
            }
            this.server.stop();
        }
    }

    @Test
    @Timeout(value=20L)
    public void testMirrorConnectionRemainsUnchangedAfterConfigurationRemoved() throws Exception {
        HashMap<String, String> brokerProperties = new HashMap<String, String>();
        brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        try (ProtonTestServer peer = new ProtonTestServer();){
            peer.expectSASLPlainConnect("user", "pass", new String[]{"PLAIN", "ANONYMOUS"});
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().withName(Matchers.startsWith((String)"$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(brokerProperties);
            peer.remoteFlow().withLinkCredit(10L).queue();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.expectTransfer().accept();
            peer.start();
            URI remoteURI = peer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", (Object)remoteURI);
            AMQPMirrorBrokerConnectionElement mirror = new AMQPMirrorBrokerConnectionElement();
            mirror.setQueueCreation(true);
            mirror.setDurable(true);
            mirror.setName("test");
            AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(this.getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
            amqpConnection.setReconnectAttempts(0);
            amqpConnection.setUser("user");
            amqpConnection.setPassword("pass");
            amqpConnection.addElement((AMQPBrokerConnectionElement)mirror);
            this.server.getConfiguration().addAMQPConnection(amqpConnection);
            this.server.start();
            ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
            try (Connection connection = factory.createConnection();){
                Session session = connection.createSession(false, 2);
                Queue queue = session.createQueue("myQueue");
                MessageConsumer consumer = session.createConsumer((Destination)queue);
                MessageProducer producer = session.createProducer((Destination)queue);
                TextMessage message = session.createTextMessage("test");
                peer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                ProtonProtocolManagerFactory protocolFactory = (ProtonProtocolManagerFactory)this.server.getRemotingService().getProtocolFactoryMap().get("AMQP");
                Assertions.assertNotNull((Object)protocolFactory);
                this.server.getConfiguration().clearAMQPConnectionConfigurations();
                protocolFactory.updateProtocolServices(this.server, Collections.emptyList());
                peer.waitForScriptToComplete();
                peer.expectTransfer().withMessageFormat(0).accept();
                producer.setDeliveryMode(2);
                producer.send((Message)message);
                consumer.close();
                peer.waitForScriptToComplete();
            }
            this.server.stop();
        }
    }
}

