/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.jms.cluster;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.naming.Context;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreBackupPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStorePrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JMSFailoverTest
extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected InVMNamingContext ctx1 = new InVMNamingContext();
    protected InVMNamingContext ctx2 = new InVMNamingContext();
    protected Configuration backupConf;
    protected Configuration primaryConf;
    protected JMSServerManager primaryJMSServer;
    protected ActiveMQServer primaryServer;
    protected JMSServerManager backupJMSServer;
    protected ActiveMQServer backupServer;
    protected Map<String, Object> backupParams = new HashMap<String, Object>();
    private TransportConfiguration backuptc;
    private TransportConfiguration primarytc;
    private TransportConfiguration primaryAcceptortc;
    private TransportConfiguration backupAcceptortc;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCreateQueue() throws Exception {
        this.primaryJMSServer.createQueue(true, "queue1", null, true, new String[]{"/queue/queue1"});
        Assertions.assertNotNull((Object)this.ctx1.lookup("/queue/queue1"));
        ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA((JMSFactoryType)JMSFactoryType.CF, (TransportConfiguration[])new TransportConfiguration[]{this.primarytc});
        jbcf.setReconnectAttempts(-1);
        try (ActiveMQConnection conn = null;){
            conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
            Session sess = conn.createSession(false, 1);
            ClientSession coreSession = ((ActiveMQSession)sess).getCoreSession();
            JMSUtil.crash(this.primaryServer, coreSession);
            Assertions.assertNotNull((Object)this.ctx2.lookup("/queue/queue1"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCreateTopic() throws Exception {
        this.primaryJMSServer.createTopic(true, "topic", new String[]{"/topic/t1"});
        Assertions.assertNotNull((Object)this.ctx1.lookup("//topic/t1"));
        ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA((JMSFactoryType)JMSFactoryType.CF, (TransportConfiguration[])new TransportConfiguration[]{this.primarytc});
        jbcf.setReconnectAttempts(-1);
        try (ActiveMQConnection conn = null;){
            conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
            Session sess = conn.createSession(false, 1);
            ClientSession coreSession = ((ActiveMQSession)sess).getCoreSession();
            JMSUtil.crash(this.primaryServer, coreSession);
            Assertions.assertNotNull((Object)this.ctx2.lookup("/topic/t1"));
        }
    }

    @Test
    public void testAutomaticFailover() throws Exception {
        BytesMessage bm;
        int i;
        ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA((JMSFactoryType)JMSFactoryType.CF, (TransportConfiguration[])new TransportConfiguration[]{this.primarytc});
        jbcf.setReconnectAttempts(-1);
        jbcf.setBlockOnDurableSend(true);
        jbcf.setBlockOnNonDurableSend(true);
        int numMessages = 10;
        int bodySize = 1000;
        jbcf.setConsumerWindowSize(1000);
        ActiveMQConnection conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
        MyExceptionListener listener = new MyExceptionListener();
        conn.setExceptionListener((ExceptionListener)listener);
        Session sess = conn.createSession(false, 1);
        ClientSession coreSession = ((ActiveMQSession)sess).getCoreSession();
        SimpleString jmsQueueName = SimpleString.of((String)"myqueue");
        coreSession.createQueue(QueueConfiguration.of((SimpleString)jmsQueueName).setRoutingType(RoutingType.ANYCAST));
        Queue queue = sess.createQueue("myqueue");
        MessageProducer producer = sess.createProducer((Destination)queue);
        producer.setDeliveryMode(2);
        MessageConsumer consumer = sess.createConsumer((Destination)queue);
        byte[] body = RandomUtil.randomBytes((int)1000);
        for (i = 0; i < 10; ++i) {
            bm = sess.createBytesMessage();
            bm.writeBytes(body);
            producer.send((Message)bm);
        }
        conn.start();
        logger.debug("sent messages and started connection");
        Thread.sleep(2000L);
        JMSUtil.crash(this.primaryServer, ((ActiveMQSession)sess).getCoreSession());
        for (i = 0; i < 10; ++i) {
            logger.debug("got message {}", (Object)i);
            bm = (BytesMessage)consumer.receive(1000L);
            Assertions.assertNotNull((Object)bm);
            Assertions.assertEquals((long)body.length, (long)bm.getBodyLength());
        }
        TextMessage tm = (TextMessage)consumer.receiveNoWait();
        Assertions.assertNull((Object)tm);
        conn.close();
    }

    @Test
    public void testManualFailover() throws Exception {
        ActiveMQConnectionFactory jbcfPrimary = ActiveMQJMSClient.createConnectionFactoryWithoutHA((JMSFactoryType)JMSFactoryType.CF, (TransportConfiguration[])new TransportConfiguration[]{new TransportConfiguration(INVM_CONNECTOR_FACTORY)});
        jbcfPrimary.setBlockOnNonDurableSend(true);
        jbcfPrimary.setBlockOnDurableSend(true);
        ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA((JMSFactoryType)JMSFactoryType.CF, (TransportConfiguration[])new TransportConfiguration[]{new TransportConfiguration(INVM_CONNECTOR_FACTORY, this.backupParams)});
        jbcfBackup.setBlockOnNonDurableSend(true);
        jbcfBackup.setBlockOnDurableSend(true);
        jbcfBackup.setInitialConnectAttempts(-1);
        jbcfBackup.setReconnectAttempts(-1);
        Connection connPrimary = jbcfPrimary.createConnection();
        MyExceptionListener listener = new MyExceptionListener();
        connPrimary.setExceptionListener((ExceptionListener)listener);
        Session sessPrimary = connPrimary.createSession(false, 1);
        ClientSession coreSessionPrimary = ((ActiveMQSession)sessPrimary).getCoreSession();
        RemotingConnection coreConnPrimary = ((ClientSessionInternal)coreSessionPrimary).getConnection();
        SimpleString jmsQueueName = SimpleString.of((String)"myqueue");
        coreSessionPrimary.createQueue(QueueConfiguration.of((SimpleString)jmsQueueName).setRoutingType(RoutingType.ANYCAST));
        Queue queue = sessPrimary.createQueue("myqueue");
        int numMessages = 1000;
        MessageProducer producerPrimary = sessPrimary.createProducer((Destination)queue);
        for (int i = 0; i < 1000; ++i) {
            TextMessage tm = sessPrimary.createTextMessage("message" + i);
            producerPrimary.send((Message)tm);
        }
        JMSUtil.crash(this.primaryServer, coreSessionPrimary);
        connPrimary.close();
        Connection connBackup = jbcfBackup.createConnection();
        Session sessBackup = connBackup.createSession(false, 1);
        MessageConsumer consumerBackup = sessBackup.createConsumer((Destination)queue);
        connBackup.start();
        for (int i = 0; i < 1000; ++i) {
            TextMessage tm = (TextMessage)consumerBackup.receive(1000L);
            Assertions.assertNotNull((Object)tm);
            Assertions.assertEquals((Object)("message" + i), (Object)tm.getText());
        }
        TextMessage tm = (TextMessage)consumerBackup.receiveNoWait();
        Assertions.assertNull((Object)tm);
        connBackup.close();
    }

    @Test
    public void testSendReceiveLargeMessages() throws Exception {
        SimpleString QUEUE = SimpleString.of((String)"somequeue");
        ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA((JMSFactoryType)JMSFactoryType.CF, (TransportConfiguration[])new TransportConfiguration[]{this.primarytc, this.backuptc});
        jbcf.setReconnectAttempts(-1);
        jbcf.setBlockOnDurableSend(true);
        jbcf.setBlockOnNonDurableSend(true);
        jbcf.setMinLargeMessageSize(1024);
        CountDownLatch flagAlign = new CountDownLatch(1);
        final CountDownLatch waitToKill = new CountDownLatch(1);
        final AtomicBoolean killed = new AtomicBoolean(false);
        jbcf.getServerLocator().addIncomingInterceptor(new Interceptor(){
            int count = 0;

            public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
                if (packet instanceof SessionReceiveContinuationMessage && this.count++ == 300 && !killed.get()) {
                    killed.set(true);
                    waitToKill.countDown();
                }
                return true;
            }
        });
        ActiveMQConnection conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
        Session sess = conn.createSession(true, 0);
        ClientSession coreSession = ((ActiveMQSession)sess).getCoreSession();
        Thread spoilerThread = new Thread(() -> {
            flagAlign.countDown();
            try {
                waitToKill.await(120L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            try {
                JMSUtil.crash(this.primaryServer, coreSession);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        coreSession.createQueue(QueueConfiguration.of((SimpleString)QUEUE).setRoutingType(RoutingType.ANYCAST));
        Queue queue = sess.createQueue("somequeue");
        MessageProducer producer = sess.createProducer((Destination)queue);
        producer.setDeliveryMode(2);
        for (int i = 0; i < 100; ++i) {
            TextMessage message = sess.createTextMessage(new String(new byte[10240]));
            producer.send((Message)message);
            if (i % 10 != 0) continue;
            sess.commit();
        }
        sess.commit();
        conn.start();
        spoilerThread.start();
        Assertions.assertTrue((boolean)flagAlign.await(10L, TimeUnit.SECONDS));
        MessageConsumer consumer = sess.createConsumer((Destination)queue);
        for (int i = 0; i < 90; ++i) {
            TextMessage message = null;
            int retryNrs = 0;
            while (true) {
                ++retryNrs;
                try {
                    message = (TextMessage)consumer.receive(5000L);
                    Assertions.assertNotNull((Object)message);
                }
                catch (JMSException e) {
                    new Exception("Exception on receive message", e).printStackTrace();
                    if (retryNrs < 10) continue;
                }
                break;
            }
            Assertions.assertNotNull((Object)message);
            try {
                sess.commit();
                continue;
            }
            catch (Exception e) {
                logger.debug("Exception during commit", (Throwable)e);
                sess.rollback();
            }
        }
        conn.close();
        spoilerThread.join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCreateNewConnectionAfterFailover() throws Exception {
        ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA((JMSFactoryType)JMSFactoryType.CF, (TransportConfiguration[])new TransportConfiguration[]{this.primarytc});
        jbcf.setInitialConnectAttempts(5);
        jbcf.setRetryInterval(1000L);
        jbcf.setReconnectAttempts(-1);
        ActiveMQConnection conn1 = null;
        ActiveMQConnection conn2 = null;
        Connection conn3 = null;
        try {
            conn1 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
            conn2 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
            Session sess1 = conn1.createSession(false, 1);
            Session sess2 = conn2.createSession(false, 1);
            ClientSession coreSession1 = ((ActiveMQSession)sess1).getCoreSession();
            ClientSession coreSession2 = ((ActiveMQSession)sess2).getCoreSession();
            Topology fullTopology = jbcf.getServerLocator().getTopology();
            Collection members = fullTopology.getMembers();
            Assertions.assertEquals((int)1, (int)members.size());
            TopologyMemberImpl member = (TopologyMemberImpl)members.iterator().next();
            TransportConfiguration tcPrimary = member.getPrimary();
            TransportConfiguration tcBackup = member.getBackup();
            System.out.println("primary tc: " + tcPrimary);
            System.out.println("Backup tc: " + tcBackup);
            JMSUtil.crash(this.primaryServer, coreSession1, coreSession2);
            this.waitForServerToStart(this.backupServer);
            this.simulatePrimaryDownHasNotReachClient((ServerLocatorImpl)jbcf.getServerLocator(), tcPrimary, tcBackup);
            try {
                conn3 = jbcf.createConnection();
            }
            catch (Exception e) {
                Assertions.fail((String)"The new connection should be established successfully after failover");
            }
        }
        finally {
            if (conn1 != null) {
                conn1.close();
            }
            if (conn2 != null) {
                conn2.close();
            }
            if (conn3 != null) {
                conn3.close();
            }
        }
    }

    private void simulatePrimaryDownHasNotReachClient(ServerLocatorImpl locator, TransportConfiguration tcPrimary, TransportConfiguration tcBackup) throws NoSuchFieldException, IllegalAccessException {
        Field f = locator.getClass().getDeclaredField("topologyArray");
        f.setAccessible(true);
        Pair[] value = (Pair[])f.get(locator);
        Assertions.assertEquals((int)1, (int)value.length);
        Pair member = value[0];
        member.setA((Object)tcPrimary);
        member.setB((Object)tcBackup);
        f.set(locator, value);
    }

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.startServers();
    }

    protected void startServers() throws Exception {
        boolean sharedStore = true;
        InVMNodeManager nodeManager = new InVMNodeManager(false);
        this.backuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, this.backupParams);
        this.primarytc = new TransportConfiguration(INVM_CONNECTOR_FACTORY);
        this.primaryAcceptortc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
        this.backupAcceptortc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, this.backupParams);
        this.backupParams.put("serverId", 1);
        this.backuptc.getParams().put("serverId", 1);
        this.backupAcceptortc.getParams().put("serverId", 1);
        this.backupConf = this.createBasicConfig().addConnectorConfiguration(this.primarytc.getName(), this.primarytc).addConnectorConfiguration(this.backuptc.getName(), this.backuptc).setSecurityEnabled(false).setJournalType(JMSFailoverTest.getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, this.backupParams)).setBindingsDirectory(this.getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(this.getJournalDir()).setPagingDirectory(this.getPageDir()).setLargeMessagesDirectory(this.getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration((HAPolicyConfiguration)new SharedStoreBackupPolicyConfiguration()).addClusterConfiguration(JMSFailoverTest.basicClusterConnectionConfig(this.backuptc.getName(), this.primarytc.getName()));
        this.backupServer = this.addServer((ActiveMQServer)new InVMNodeManagerServer(this.backupConf, (NodeManager)nodeManager));
        this.backupJMSServer = new JMSServerManagerImpl(this.backupServer);
        this.backupJMSServer.setRegistry((BindingRegistry)new JndiBindingRegistry((Context)this.ctx2));
        this.backupJMSServer.getActiveMQServer().setIdentity("JMSBackup");
        logger.debug("Starting backup");
        this.backupJMSServer.start();
        this.primaryConf = this.createBasicConfig().setJournalDirectory(this.getJournalDir()).setBindingsDirectory(this.getBindingsDir()).setSecurityEnabled(false).addAcceptorConfiguration(this.primaryAcceptortc).setJournalType(JMSFailoverTest.getDefaultJournalType()).setBindingsDirectory(this.getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(this.getJournalDir()).setPagingDirectory(this.getPageDir()).setLargeMessagesDirectory(this.getLargeMessagesDir()).addConnectorConfiguration(this.primarytc.getName(), this.primarytc).setPersistenceEnabled(true).setHAPolicyConfiguration((HAPolicyConfiguration)new SharedStorePrimaryPolicyConfiguration()).addClusterConfiguration(JMSFailoverTest.basicClusterConnectionConfig(this.primarytc.getName(), new String[0]));
        this.primaryServer = this.addServer((ActiveMQServer)new InVMNodeManagerServer(this.primaryConf, (NodeManager)nodeManager));
        this.primaryJMSServer = new JMSServerManagerImpl(this.primaryServer);
        this.primaryJMSServer.setRegistry((BindingRegistry)new JndiBindingRegistry((Context)this.ctx1));
        this.primaryJMSServer.getActiveMQServer().setIdentity("JMSPrimary");
        logger.debug("Starting primary");
        this.primaryJMSServer.start();
        JMSUtil.waitForServer(this.backupServer);
    }

    private static class MyExceptionListener
    implements ExceptionListener {
        volatile JMSException e;

        private MyExceptionListener() {
        }

        public void onException(JMSException e) {
            this.e = e;
        }
    }
}

