/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.File;
import java.io.PrintStream;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.StringPrintStream;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BrokerInSyncTest
extends AmqpClientTestSupport {
    public static final int TIME_BEFORE_RESTART = 1000;
    protected static final int AMQP_PORT_2 = 5673;
    protected static final int AMQP_PORT_3 = 5674;
    private static final Logger logger = Logger.getLogger(BrokerInSyncTest.class);
    ActiveMQServer server_2;

    @Before
    public void startLogging() {
        AssertionLoggerHandler.startCapture();
    }

    @After
    public void stopLogging() {
        try {
            Assert.assertFalse((boolean)AssertionLoggerHandler.findText((String[])new String[]{"AMQ222214"}));
        }
        finally {
            AssertionLoggerHandler.stopCapture();
        }
    }

    @Override
    protected ActiveMQServer createServer() throws Exception {
        return this.createServer(5672, false);
    }

    @Test
    public void testSyncOnCreateQueues() throws Exception {
        this.server.setIdentity("Server1");
        AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:5673").setReconnectAttempts(3).setRetryInterval(100);
        amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server.getConfiguration().addAMQPConnection(amqpConnection);
        this.server.start();
        this.server_2 = this.createServer(5673, false);
        this.server_2.setIdentity("Server2");
        amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server_2.getConfiguration().addAMQPConnection(amqpConnection);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false));
        this.server_2.createQueue(new QueueConfiguration("sometest").setDurable(Boolean.valueOf(true)));
        Wait.assertTrue(() -> this.server_2.locateQueue("sometest") != null);
        Wait.assertTrue(() -> this.server.locateQueue("sometest") != null);
        this.server.addAddressInfo(new AddressInfo("OnServer1").setAutoCreated(false));
        this.server.createQueue(new QueueConfiguration("OnServer1").setDurable(Boolean.valueOf(true)));
        Wait.assertTrue(() -> this.server.locateQueue("OnServer1") != null);
        Wait.assertTrue((String)"Sync is not working on the way back", () -> this.server_2.locateQueue("OnServer1") != null, (long)2000L);
        Wait.assertTrue(() -> this.server_2.locateQueue("sometest") != null);
        Wait.assertTrue(() -> this.server.locateQueue("sometest") != null);
        for (int i = 0; i < 10; ++i) {
            int queueID = i;
            this.server_2.createQueue(new QueueConfiguration("test2_" + i).setDurable(Boolean.valueOf(true)));
            this.server.createQueue(new QueueConfiguration("test1_" + i).setDurable(Boolean.valueOf(true)));
            Wait.assertTrue(() -> this.server.locateQueue("test2_" + queueID) != null);
            Wait.assertTrue(() -> this.server.locateQueue("test1_" + queueID) != null);
            Wait.assertTrue(() -> this.server_2.locateQueue("test2_" + queueID) != null);
            Wait.assertTrue(() -> this.server_2.locateQueue("test1_" + queueID) != null);
        }
        this.server_2.stop();
        this.server.stop();
    }

    @Test
    public void testSingleMessage() throws Exception {
        this.server.setIdentity("Server1");
        AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:5673").setReconnectAttempts(3).setRetryInterval(100);
        amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server.getConfiguration().addAMQPConnection(amqpConnection);
        this.server.start();
        this.server_2 = this.createServer(5673, false);
        this.server_2.setIdentity("Server2");
        amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server_2.getConfiguration().addAMQPConnection(amqpConnection);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo(this.getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
        this.server_2.createQueue(new QueueConfiguration(this.getQueueName()).setDurable(Boolean.valueOf(true)).setRoutingType(RoutingType.ANYCAST));
        Wait.assertTrue(() -> this.server_2.locateQueue(this.getQueueName()) != null);
        Wait.assertTrue(() -> this.server.locateQueue(this.getQueueName()) != null);
        ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        Connection connection1 = cf1.createConnection();
        Session session1 = connection1.createSession(true, 0);
        connection1.start();
        ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        Connection connection2 = cf2.createConnection();
        Session session2 = connection2.createSession(true, 0);
        connection2.start();
        jakarta.jms.Queue queue = session1.createQueue(this.getQueueName());
        MessageProducer producerServer1 = session1.createProducer((Destination)queue);
        MessageProducer producerServer2 = session2.createProducer((Destination)queue);
        TextMessage message = session1.createTextMessage("test");
        message.setIntProperty("i", 0);
        message.setStringProperty("server", this.server.getIdentity());
        producerServer1.send((Message)message);
        session1.commit();
        Queue queueOnServer1 = this.server.locateQueue(this.getQueueName());
        Queue queueOnServer2 = this.server_2.locateQueue(this.getQueueName());
        Assert.assertNotNull((Object)queueOnServer1);
        Assert.assertNotNull((Object)queueOnServer2);
        Wait.assertEquals((long)1L, () -> ((Queue)queueOnServer1).getMessageCount());
        Wait.assertEquals((long)1L, () -> ((Queue)queueOnServer2).getMessageCount());
        message = session1.createTextMessage("test");
        message.setIntProperty("i", 1);
        message.setStringProperty("server", this.server_2.getIdentity());
        producerServer2.send((Message)message);
        session2.commit();
        if (logger.isDebugEnabled() && !Wait.waitFor(() -> queueOnServer1.getMessageCount() == 2L)) {
            this.debugData();
        }
        Wait.assertEquals((long)2L, () -> ((Queue)queueOnServer1).getMessageCount());
        Wait.assertEquals((long)2L, () -> ((Queue)queueOnServer2).getMessageCount());
        connection1.close();
        connection2.close();
        this.server_2.stop();
        this.server.stop();
    }

    @Test
    public void testSyncData() throws Exception {
        int NUMBER_OF_MESSAGES = 100;
        this.server.setIdentity("Server1");
        AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:5673").setReconnectAttempts(3).setRetryInterval(100);
        amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server.getConfiguration().addAMQPConnection(amqpConnection);
        this.server.start();
        this.server_2 = this.createServer(5673, false);
        this.server_2.setIdentity("Server2");
        amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server_2.getConfiguration().addAMQPConnection(amqpConnection);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo(this.getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
        this.server_2.createQueue(new QueueConfiguration(this.getQueueName()).setDurable(Boolean.valueOf(true)).setRoutingType(RoutingType.ANYCAST));
        Wait.assertTrue(() -> this.server_2.locateQueue(this.getQueueName()) != null);
        Wait.assertTrue(() -> this.server.locateQueue(this.getQueueName()) != null);
        ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        Connection connection1 = cf1.createConnection();
        Session session1 = connection1.createSession(true, 0);
        connection1.start();
        ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        Connection connection2 = cf2.createConnection();
        Session session2 = connection2.createSession(true, 0);
        connection2.start();
        jakarta.jms.Queue queue = session1.createQueue(this.getQueueName());
        MessageProducer producerServer1 = session1.createProducer((Destination)queue);
        MessageProducer producerServer2 = session2.createProducer((Destination)queue);
        for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
            TextMessage message = session1.createTextMessage("test " + i);
            message.setIntProperty("i", i);
            message.setStringProperty("server", this.server.getIdentity());
            producerServer1.send((Message)message);
        }
        session1.commit();
        Queue queueOnServer1 = this.server.locateQueue(this.getQueueName());
        Queue queueOnServer2 = this.server_2.locateQueue(this.getQueueName());
        Assert.assertNotNull((Object)queueOnServer1);
        Assert.assertNotNull((Object)queueOnServer2);
        Wait.assertEquals((long)NUMBER_OF_MESSAGES, () -> ((Queue)queueOnServer1).getMessageCount());
        Wait.assertEquals((long)NUMBER_OF_MESSAGES, () -> ((Queue)queueOnServer2).getMessageCount());
        for (int i = NUMBER_OF_MESSAGES; i < NUMBER_OF_MESSAGES * 2; ++i) {
            TextMessage message = session1.createTextMessage("test " + i);
            message.setIntProperty("i", i);
            message.setStringProperty("server", this.server_2.getIdentity());
            producerServer2.send((Message)message);
        }
        session2.commit();
        if (logger.isDebugEnabled() && !Wait.waitFor(() -> queueOnServer1.getMessageCount() == (long)(NUMBER_OF_MESSAGES * 2))) {
            this.debugData();
        }
        Wait.assertEquals((long)(NUMBER_OF_MESSAGES * 2), () -> ((Queue)queueOnServer1).getMessageCount());
        Wait.assertEquals((long)(NUMBER_OF_MESSAGES * 2), () -> ((Queue)queueOnServer2).getMessageCount());
        MessageConsumer consumerOn1 = session1.createConsumer((Destination)queue);
        for (int i = 0; i < NUMBER_OF_MESSAGES * 2; ++i) {
            TextMessage message = (TextMessage)consumerOn1.receive(5000L);
            logger.debug((Object)("### Client acking message(" + i + ") on server 1, a message that was original sent on " + message.getStringProperty("server") + " text = " + message.getText()));
            Assert.assertNotNull((Object)message);
            Assert.assertEquals((long)i, (long)message.getIntProperty("i"));
            Assert.assertEquals((Object)("test " + i), (Object)message.getText());
            session1.commit();
        }
        boolean bothConsumed = Wait.waitFor(() -> {
            long q1 = queueOnServer1.getMessageCount();
            long q2 = queueOnServer2.getMessageCount();
            logger.debug((Object)("Queue on Server 1 = " + q1));
            logger.debug((Object)("Queue on Server 2 = " + q2));
            return q1 == 0L && q2 == 0L;
        }, (long)5000L, (long)1000L);
        if (logger.isDebugEnabled() && !bothConsumed) {
            this.debugData();
            Assert.fail((String)("q1 = " + queueOnServer1.getMessageCount() + ", q2 = " + queueOnServer2.getMessageCount()));
        }
        Assert.assertEquals((long)0L, (long)queueOnServer1.getMessageCount());
        Assert.assertEquals((long)0L, (long)queueOnServer2.getConsumerCount());
        System.out.println("Queue on Server 1 = " + queueOnServer1.getMessageCount());
        System.out.println("Queue on Server 2 = " + queueOnServer2.getMessageCount());
        this.server_2.stop();
        this.server.stop();
    }

    private void debugData() throws Exception {
        StringPrintStream stringPrintStream = new StringPrintStream();
        PrintStream out = stringPrintStream.newStream();
        Queue queueToDebugOn1 = this.server.locateQueue(this.getQueueName());
        Queue queueToDebugOn2 = this.server_2.locateQueue(this.getQueueName());
        out.println("*******************************************************************************************************************************");
        out.println("Queue on Server 1 with count = " + queueToDebugOn1.getMessageCount());
        queueToDebugOn1.forEach(r -> out.println("Server1 has reference " + r.getMessage()));
        out.println("*******************************************************************************************************************************");
        out.println("Queue on Server 2 with count = " + queueToDebugOn2.getMessageCount());
        queueToDebugOn2.forEach(r -> out.println("Server2 has reference " + r.getMessage()));
        out.println("*******************************************************************************************************************************");
        out.println("PrintData Server 1");
        PrintData.printMessages((File)this.server.getConfiguration().getJournalLocation(), (PrintStream)out, (boolean)false, (boolean)false, (boolean)true, (boolean)false);
        out.println("*******************************************************************************************************************************");
        out.println("PrintData Server 2");
        PrintData.printMessages((File)this.server_2.getConfiguration().getJournalLocation(), (PrintStream)out, (boolean)false, (boolean)false, (boolean)true, (boolean)false);
        logger.debug((Object)("Data Available on Servers:\n" + stringPrintStream.toString()));
    }

    @Test
    public void testSyncDataNoSuppliedID() throws Exception {
        int NUMBER_OF_MESSAGES = 100;
        this.server.setIdentity("Server1");
        AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:5673").setReconnectAttempts(3).setRetryInterval(100);
        amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server.getConfiguration().addAMQPConnection(amqpConnection);
        this.server.start();
        this.server_2 = this.createServer(5673, false);
        this.server_2.setIdentity("Server2");
        amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        amqpConnection.addElement((AMQPBrokerConnectionElement)new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server_2.getConfiguration().addAMQPConnection(amqpConnection);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo(this.getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
        this.server_2.createQueue(new QueueConfiguration(this.getQueueName()).setDurable(Boolean.valueOf(true)).setRoutingType(RoutingType.ANYCAST));
        Wait.assertTrue(() -> this.server_2.locateQueue(this.getQueueName()) != null);
        Wait.assertTrue(() -> this.server.locateQueue(this.getQueueName()) != null);
        AmqpClient cf1 = new AmqpClient(new URI("tcp://localhost:5672"), null, null);
        AmqpConnection connection1 = cf1.createConnection();
        connection1.connect();
        AmqpSession session1 = connection1.createSession();
        AmqpClient cf2 = new AmqpClient(new URI("tcp://localhost:5673"), null, null);
        AmqpConnection connection2 = cf2.createConnection();
        connection2.connect();
        AmqpSession session2 = connection2.createSession();
        AmqpSender producerServer1 = session1.createSender(this.getQueueName());
        AmqpSender producerServer2 = session2.createSender(this.getQueueName());
        for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
            AmqpMessage message = new AmqpMessage();
            message.setDurable(true);
            message.setApplicationProperty("i", (Object)i);
            producerServer1.send(message);
        }
        Queue queueOnServer1 = this.server.locateQueue(this.getQueueName());
        Queue queueOnServer2 = this.server_2.locateQueue(this.getQueueName());
        Assert.assertNotNull((Object)queueOnServer1);
        Assert.assertNotNull((Object)queueOnServer2);
        Wait.assertEquals((long)NUMBER_OF_MESSAGES, () -> ((Queue)queueOnServer1).getMessageCount());
        Wait.assertEquals((long)NUMBER_OF_MESSAGES, () -> ((Queue)queueOnServer2).getMessageCount());
        for (int i = NUMBER_OF_MESSAGES; i < NUMBER_OF_MESSAGES * 2; ++i) {
            AmqpMessage message = new AmqpMessage();
            message.setDurable(true);
            message.setApplicationProperty("i", (Object)i);
            producerServer2.send(message);
        }
        Wait.assertEquals((long)(NUMBER_OF_MESSAGES * 2), () -> ((Queue)queueOnServer1).getMessageCount());
        Wait.assertEquals((long)(NUMBER_OF_MESSAGES * 2), () -> ((Queue)queueOnServer2).getMessageCount());
        AmqpReceiver consumerOn1 = session1.createReceiver(this.getQueueName());
        consumerOn1.flow(NUMBER_OF_MESSAGES * 2 + 1);
        for (int i = 0; i < NUMBER_OF_MESSAGES * 2; ++i) {
            AmqpMessage message = consumerOn1.receive(5L, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)message);
            message.accept();
            Assert.assertEquals((long)i, (long)((Integer)message.getApplicationProperty("i")).intValue());
        }
        Wait.assertEquals((long)0L, () -> ((Queue)queueOnServer1).getMessageCount());
        Wait.assertEquals((long)0L, () -> {
            System.out.println(queueOnServer2.getMessageCount());
            return queueOnServer2.getMessageCount();
        });
        connection1.close();
        connection2.close();
        this.server_2.stop();
        this.server.stop();
    }
}

