/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.divert;

import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.MapMessage;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ReplicationWithDivertTest
extends ActiveMQTestBase {
    public static final String JMS_SOURCE_QUEUE = "Queue";
    public static final String SOURCE_QUEUE = "Queue";
    public static final String JMS_TARGET_QUEUE = "DestQueue";
    public static final String TARGET_QUEUE = "DestQueue";
    public static int messageChunkCount = 0;
    private static ActiveMQServer backupServer;
    private static ActiveMQServer liveServer;
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
    ActiveMQConnection connection;
    Session session;
    Queue queue;
    Queue targetQueue;
    MessageProducer producer;
    Configuration backupConfig;
    Configuration liveConfig;
    static final ReusableLatch flagChunkEntered;
    static final ReusableLatch flagChunkWait;
    static final ReusableLatch flagSyncEntered;
    static final ReusableLatch flagSyncWait;

    @Before
    public void setUp() throws Exception {
        super.setUp();
        flagChunkEntered.setCount(1);
        flagChunkWait.setCount(1);
        flagSyncEntered.setCount(1);
        flagSyncWait.setCount(1);
        messageChunkCount = 0;
        TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
        TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
        TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
        TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
        this.backupConfig = this.createDefaultInVMConfig().setBindingsDirectory(this.getBindingsDir(0, true)).setJournalDirectory(this.getJournalDir(0, true)).setPagingDirectory(this.getPageDir(0, true)).setLargeMessagesDirectory(this.getLargeMessagesDir(0, true));
        this.backupConfig.addQueueConfiguration(new QueueConfiguration("Queue").setRoutingType(RoutingType.ANYCAST));
        this.backupConfig.addQueueConfiguration(new QueueConfiguration("DestQueue").setRoutingType(RoutingType.ANYCAST));
        DivertConfiguration divertConfiguration = new DivertConfiguration().setName("Test").setAddress("Queue").setForwardingAddress("DestQueue").setRoutingName("Test");
        this.liveConfig = this.createDefaultInVMConfig();
        this.liveConfig.addQueueConfiguration(new QueueConfiguration("Queue").setRoutingType(RoutingType.ANYCAST));
        this.liveConfig.addQueueConfiguration(new QueueConfiguration("DestQueue").setRoutingType(RoutingType.ANYCAST));
        this.liveConfig.addDivertConfiguration(divertConfiguration);
        this.backupConfig.addDivertConfiguration(divertConfiguration);
        ReplicatedBackupUtils.configureReplicationPair(this.backupConfig, backupConnector, backupAcceptor, this.liveConfig, liveConnector, liveAcceptor);
        liveServer = this.createServer(this.liveConfig);
        liveServer.start();
        this.startBackup();
        this.waitForServerToStart(liveServer);
        Assert.assertEquals((long)10000L, (long)this.factory.getMinLargeMessageSize());
        Assert.assertEquals((long)10000L, (long)this.factory.getProducerWindowSize());
        Assert.assertEquals((long)100L, (long)this.factory.getRetryInterval());
        Assert.assertEquals((long)-1L, (long)this.factory.getReconnectAttempts());
        Assert.assertTrue((boolean)this.factory.isHA());
        this.connection = (ActiveMQConnection)this.factory.createConnection();
        this.session = this.connection.createSession(true, 0);
        this.queue = this.session.createQueue("Queue");
        this.targetQueue = this.session.createQueue("DestQueue");
        this.producer = this.session.createProducer((Destination)this.queue);
    }

    private void startBackup() throws Exception {
        backupServer = this.createServer(this.backupConfig);
        backupServer.start();
        this.waitForServerToStart(backupServer);
    }

    @After
    public void stopServers() throws Exception {
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (backupServer != null) {
            backupServer.stop();
            backupServer = null;
        }
        if (liveServer != null) {
            liveServer.stop();
            liveServer = null;
        }
        liveServer = null;
        backupServer = null;
    }

    @Test
    public void testSendLargeMessage() throws Exception {
        int i;
        MapMessage message;
        int msgi;
        final CountDownLatch failedOver = new CountDownLatch(1);
        this.connection.setFailoverListener(new FailoverEventListener(){

            public void failoverEvent(FailoverEventType eventType) {
                failedOver.countDown();
            }
        });
        int numberOfMessage = 5;
        final MapMessage message2 = this.createLargeMessage();
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    for (int i = 0; i < 5; ++i) {
                        ReplicationWithDivertTest.this.producer.send((Message)message2);
                        ReplicationWithDivertTest.this.session.commit();
                    }
                }
                catch (JMSException expected) {
                    expected.printStackTrace();
                }
            }
        };
        t.start();
        t.join(10000L);
        MessageConsumer consumer = this.session.createConsumer((Destination)this.queue);
        this.connection.start();
        for (msgi = 0; msgi < 5; ++msgi) {
            message = (MapMessage)consumer.receive(5000L);
            Assert.assertNotNull((Object)message);
            for (i = 0; i < 10; ++i) {
                Assert.assertEquals((long)204800L, (long)message.getBytes("test" + i).length);
            }
            this.session.commit();
        }
        consumer.close();
        Assert.assertFalse((boolean)t.isAlive());
        liveServer.fail(true);
        Assert.assertTrue((boolean)failedOver.await(10L, TimeUnit.SECONDS));
        consumer = this.session.createConsumer((Destination)this.targetQueue);
        this.connection.start();
        for (msgi = 0; msgi < 5; ++msgi) {
            message = (MapMessage)consumer.receive(5000L);
            Assert.assertNotNull((Object)message);
            for (i = 0; i < 10; ++i) {
                Assert.assertEquals((long)204800L, (long)message.getBytes("test" + i).length);
            }
            this.session.commit();
        }
        consumer.close();
    }

    private MapMessage createLargeMessage() throws JMSException {
        MapMessage message = this.session.createMapMessage();
        for (int i = 0; i < 10; ++i) {
            message.setBytes("test" + i, new byte[204800]);
        }
        return message;
    }

    static {
        flagChunkEntered = new ReusableLatch(1);
        flagChunkWait = new ReusableLatch(1);
        flagSyncEntered = new ReusableLatch(1);
        flagSyncWait = new ReusableLatch(1);
    }
}

