/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.paging;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaxMessagesPagingTest
extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected static final int PAGE_MAX = 102400;
    protected static final int PAGE_SIZE = 10240;
    protected ActiveMQServer server;

    @Test
    public void testGlobalMaxMessages() throws Exception {
        int i;
        SimpleString ADDRESS = SimpleString.of((String)"testGlobalMaxMessages");
        this.clearDataRecreateServerDirs();
        Configuration config = this.createDefaultInVMConfig();
        int PAGE_MAX = 102400;
        int PAGE_SIZE = 10240;
        ActiveMQServer server = this.createServer(true, config, 10240, 102400L);
        server.getConfiguration().setGlobalMaxMessages(100L);
        server.start();
        ServerLocator locator = this.createInVMNonHALocator();
        locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
        ClientSessionFactory sf = this.addSessionFactory(this.createSessionFactory(locator));
        ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
        session.createQueue(QueueConfiguration.of((SimpleString)ADDRESS).setAddress(ADDRESS));
        ClientProducer producer = session.createProducer(ADDRESS);
        ClientMessage message = null;
        int messageSize = 1024;
        for (int i2 = 0; i2 < 30; ++i2) {
            message = session.createMessage(true);
            message.getBodyBuffer().writerIndex(0);
            message.getBodyBuffer().writeBytes(new byte[messageSize]);
            for (int j = 1; j <= messageSize; ++j) {
                message.getBodyBuffer().writeInt(j);
            }
            producer.send((org.apache.activemq.artemis.api.core.Message)message);
        }
        Queue queue = server.locateQueue(ADDRESS);
        Wait.assertTrue(() -> ((PagingStore)queue.getPagingStore()).isPaging());
        ClientConsumer consumer = session.createConsumer(ADDRESS);
        session.start();
        for (i = 0; i < 30; ++i) {
            message = consumer.receive(5000L);
            Assertions.assertNotNull((Object)message);
            message.acknowledge();
        }
        session.commit();
        Wait.assertFalse(() -> ((PagingStore)queue.getPagingStore()).isPaging());
        messageSize = 1;
        for (i = 0; i < 102; ++i) {
            message = session.createMessage(true);
            message.getBodyBuffer().writerIndex(0);
            message.getBodyBuffer().writeBytes(new byte[messageSize]);
            producer.send((org.apache.activemq.artemis.api.core.Message)message);
            if (i != 30) continue;
            Wait.assertFalse(() -> ((PagingStore)queue.getPagingStore()).isPaging());
        }
        Wait.assertTrue(() -> ((PagingStore)queue.getPagingStore()).isPaging());
        SizeAwareMetric globalSizeMetric = PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager());
        Assertions.assertTrue((boolean)globalSizeMetric.isOverElements());
        Assertions.assertFalse((boolean)globalSizeMetric.isOverSize());
        session.close();
    }

    @Test
    public void testGlobalMaxMessagesMultipleQueues() throws Exception {
        SimpleString address;
        int adr;
        String baseAddress = "testGlobal";
        this.clearDataRecreateServerDirs();
        Configuration config = this.createDefaultInVMConfig();
        int PAGE_MAX = 102400;
        int PAGE_SIZE = 10240;
        ActiveMQServer server = this.createServer(true, config, 10240, 102400L);
        server.getConfiguration().setGlobalMaxMessages(50L);
        server.start();
        ServerLocator locator = this.createInVMNonHALocator();
        locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
        ClientSessionFactory sf = this.addSessionFactory(this.createSessionFactory(locator));
        ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
        for (adr = 1; adr <= 2; ++adr) {
            address = SimpleString.of((String)("testGlobal" + adr));
            session.createQueue(QueueConfiguration.of((SimpleString)address).setAddress(address));
        }
        for (adr = 1; adr <= 2; ++adr) {
            address = SimpleString.of((String)("testGlobal" + adr));
            ClientProducer producer = session.createProducer(address);
            ClientMessage message = null;
            for (int i = 0; i < 30; ++i) {
                message = session.createMessage(true);
                message.getBodyBuffer().writerIndex(0);
                message.getBodyBuffer().writeBytes(new byte[1]);
                producer.send((org.apache.activemq.artemis.api.core.Message)message);
            }
            Queue queue = server.locateQueue(address);
            if (adr == 1) {
                Wait.assertFalse(() -> ((PagingStore)queue.getPagingStore()).isPaging());
                continue;
            }
            Wait.assertTrue(() -> ((PagingStore)queue.getPagingStore()).isPaging());
        }
        SizeAwareMetric globalSizeMetric = PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager());
        Assertions.assertTrue((boolean)globalSizeMetric.isOverElements());
        Assertions.assertFalse((boolean)globalSizeMetric.isOverSize());
        session.close();
    }

    @Test
    public void testMaxOnAddress() throws Exception {
        SimpleString address;
        int adr;
        String baseAddress = "testMaxOnAddress";
        this.clearDataRecreateServerDirs();
        Configuration config = this.createDefaultInVMConfig();
        int PAGE_MAX = 102400;
        int PAGE_SIZE = 10240;
        ActiveMQServer server = this.createServer(true, config, 10240, 102400L);
        server.getConfiguration().setGlobalMaxMessages(50L);
        server.start();
        AddressSettings max5 = new AddressSettings().setMaxSizeMessages(5L);
        server.getAddressSettingsRepository().addMatch("#", (Object)max5);
        ServerLocator locator = this.createInVMNonHALocator();
        locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
        ClientSessionFactory sf = this.addSessionFactory(this.createSessionFactory(locator));
        ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
        for (adr = 1; adr <= 2; ++adr) {
            address = SimpleString.of((String)("testMaxOnAddress" + adr));
            session.createQueue(QueueConfiguration.of((SimpleString)address).setAddress(address));
        }
        for (adr = 1; adr <= 1; ++adr) {
            address = SimpleString.of((String)("testMaxOnAddress" + adr));
            ClientProducer producer = session.createProducer(address);
            ClientMessage message = null;
            Queue queue = server.locateQueue(address);
            for (int i = 0; i < 10; ++i) {
                message = session.createMessage(true);
                message.getBodyBuffer().writerIndex(0);
                message.getBodyBuffer().writeBytes(new byte[1]);
                producer.send((org.apache.activemq.artemis.api.core.Message)message);
                if (i >= 4) {
                    Wait.assertTrue(() -> ((PagingStore)queue.getPagingStore()).isPaging());
                    continue;
                }
                Assertions.assertFalse((boolean)queue.getPagingStore().isPaging());
            }
        }
    }

    @Test
    public void testMaxOnAddressHitGlobal() throws Exception {
        String baseAddress = "testMaxOnAddress";
        this.clearDataRecreateServerDirs();
        Configuration config = this.createDefaultInVMConfig();
        int PAGE_MAX = 102400;
        int PAGE_SIZE = 10240;
        ActiveMQServer server = this.createServer(true, config, 10240, 102400L);
        server.getConfiguration().setGlobalMaxMessages(40L);
        server.start();
        AddressSettings max5 = new AddressSettings().setMaxSizeMessages(5L).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
        server.getAddressSettingsRepository().addMatch("#", (Object)max5);
        ServerLocator locator = this.createInVMNonHALocator();
        locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
        ClientSessionFactory sf = this.addSessionFactory(this.createSessionFactory(locator));
        ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
        for (int adr = 0; adr < 11; ++adr) {
            SimpleString address = SimpleString.of((String)("testMaxOnAddress" + adr));
            session.createQueue(QueueConfiguration.of((SimpleString)address).setAddress(address));
            ClientProducer producer = session.createProducer(address);
            ClientMessage message = null;
            Queue queue = server.locateQueue(address);
            for (int i = 0; i < 4; ++i) {
                message = session.createMessage(true);
                message.getBodyBuffer().writerIndex(0);
                message.getBodyBuffer().writeBytes(new byte[1]);
                producer.send((org.apache.activemq.artemis.api.core.Message)message);
            }
            if (adr >= 9) {
                Wait.assertTrue(() -> ((PagingStore)queue.getPagingStore()).isPaging());
                continue;
            }
            Assertions.assertFalse((boolean)queue.getPagingStore().isPaging());
        }
    }

    @Test
    public void testFailMaxMessage() throws Exception {
        this.internalFailMaxMessge(false);
    }

    @Test
    public void testFailMaxMessageGlobal() throws Exception {
        this.internalFailMaxMessge(true);
    }

    private void internalFailMaxMessge(boolean global) throws Exception {
        this.clearDataRecreateServerDirs();
        Configuration config = this.createDefaultConfig(true);
        if (global) {
            config.setGlobalMaxMessages(10L);
        }
        this.server = this.createServer(true, config, 1024, 5120L, new HashMap<String, AddressSettings>());
        this.server.start();
        this.internalFailMaxMessages("CORE", this.server, global);
        this.internalFailMaxMessages("AMQP", this.server, global);
        this.internalFailMaxMessages("OPENWIRE", this.server, global);
    }

    private void internalFailMaxMessages(String protocol, ActiveMQServer server, boolean global) throws Exception {
        String ADDRESS = "FAIL_MAX_MESSAGES_" + protocol;
        int MESSAGE_COUNT = 10;
        AddressSettings set = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
        if (global) {
            set.setMaxSizeBytes(-1L).setMaxSizeMessages(-1L);
        } else {
            set.setMaxSizeBytes(-1L).setMaxSizeMessages(10L);
        }
        server.getAddressSettingsRepository().addMatch(ADDRESS, (Object)set);
        server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
        server.createQueue(QueueConfiguration.of((String)ADDRESS).setRoutingType(RoutingType.ANYCAST));
        ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
        Connection conn = factory.createConnection();
        this.runAfter(() -> ((Connection)conn).close());
        Session session = conn.createSession(false, 1);
        conn.start();
        Queue queue = server.locateQueue(ADDRESS);
        for (int repeat = 0; repeat < 5; ++repeat) {
            boolean durable = repeat % 2 == 0;
            MessageProducer producer = session.createProducer((Destination)session.createQueue(ADDRESS));
            producer.setDeliveryMode(durable ? 2 : 1);
            for (int i = 0; i < 10; ++i) {
                producer.send((Message)session.createTextMessage("OK"));
            }
            Wait.assertEquals((long)10L, () -> ((Queue)queue).getMessageCount());
            try (AssertionLoggerHandler loggerHandler2 = new AssertionLoggerHandler(true);){
                producer.send((Message)session.createTextMessage("should fail"));
                if (durable) {
                    Assertions.fail((String)"supposed to fail");
                } else {
                    Wait.assertTrue(() -> loggerHandler2.findTrace("is full"));
                }
            }
            catch (Exception loggerHandler2) {
                // empty catch block
            }
            MessageConsumer consumer = session.createConsumer((Destination)session.createQueue(ADDRESS));
            for (int i = 0; i < 10; ++i) {
                TextMessage message = (TextMessage)consumer.receive(1000L);
                Assertions.assertNotNull((Object)message);
                Assertions.assertEquals((Object)"OK", (Object)message.getText());
            }
            Assertions.assertNull((Object)consumer.receiveNoWait());
            consumer.close();
            producer.close();
        }
        conn.close();
    }

    @Test
    public void testBlockMaxMessage() throws Exception {
        this.internalBlockMaxMessge(false);
    }

    @Test
    public void testBlockMaxMessageGlobal() throws Exception {
        this.internalBlockMaxMessge(true);
    }

    private void internalBlockMaxMessge(boolean global) throws Exception {
        this.clearDataRecreateServerDirs();
        Configuration config = this.createDefaultConfig(true);
        if (global) {
            config.setGlobalMaxMessages(10L);
        }
        this.server = this.createServer(true, config, 1024, 5120L, new HashMap<String, AddressSettings>());
        this.server.start();
        this.internalBlockMaxMessages("AMQP", "CORE", this.server, global);
        this.internalBlockMaxMessages("AMQP", "OPENWIRE", this.server, global);
        this.internalBlockMaxMessages("AMQP", "AMQP", this.server, global);
        this.internalBlockMaxMessages("CORE", "CORE", this.server, global);
        this.internalBlockMaxMessages("CORE", "AMQP", this.server, global);
        this.internalBlockMaxMessages("CORE", "OPENWIRE", this.server, global);
        this.internalBlockMaxMessages("OPENWIRE", "OPENWIRE", this.server, global);
        this.internalBlockMaxMessages("OPENWIRE", "AMQP", this.server, global);
        this.internalBlockMaxMessages("OPENWIRE", "CORE", this.server, global);
    }

    private void internalBlockMaxMessages(String protocolSend, String protocolReceive, ActiveMQServer server, boolean global) throws Exception {
        int MESSAGES = 1200;
        logger.info("\n{}\nSending {}, Receiving {}\n{}", new Object[]{"*".repeat(80), protocolSend, protocolReceive, "*".repeat(80)});
        String ADDRESS = "FAIL_MAX_MESSAGES_" + protocolSend + "_" + protocolReceive;
        AddressSettings set = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
        if (global) {
            set.setMaxSizeBytes(-1L).setMaxSizeMessages(-1L);
        } else {
            set.setMaxSizeBytes(-1L).setMaxSizeMessages(10L);
        }
        server.getAddressSettingsRepository().addMatch(ADDRESS, (Object)set);
        server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
        server.createQueue(QueueConfiguration.of((String)ADDRESS).setRoutingType(RoutingType.ANYCAST));
        ConnectionFactory factorySend = CFUtil.createConnectionFactory(protocolSend, "tcp://localhost:61616");
        Connection connSend = factorySend.createConnection();
        ConnectionFactory factoryReceive = CFUtil.createConnectionFactory(protocolReceive, "tcp://localhost:61616");
        Connection connReceive = factoryReceive.createConnection();
        connReceive.start();
        this.runAfter(() -> ((Connection)connSend).close());
        this.runAfter(() -> ((Connection)connReceive).close());
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        this.runAfter(executorService::shutdownNow);
        try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();){
            CountDownLatch done = new CountDownLatch(1);
            executorService.execute(() -> {
                try {
                    Session session = connSend.createSession(false, 1);
                    MessageProducer producer = session.createProducer((Destination)session.createQueue(ADDRESS));
                    producer.setDeliveryMode(protocolSend.equals("OPENWIRE") ? 2 : 1);
                    for (int i = 0; i < 1200; ++i) {
                        logger.debug("Sending {} protocol {}", (Object)i, (Object)protocolSend);
                        producer.send((Message)session.createTextMessage("OK!" + i));
                    }
                    session.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                done.countDown();
            });
            Wait.assertTrue(() -> loggerHandler.findText(new String[]{"AMQ222183"}), (long)5000L, (long)10L);
            Assertions.assertFalse((boolean)loggerHandler.findText(new String[]{"AMQ221046"}));
            Assertions.assertFalse((boolean)done.await(200L, TimeUnit.MILLISECONDS));
        }
        loggerHandler = new AssertionLoggerHandler();
        try {
            Session sessionReceive = connReceive.createSession(false, 1);
            MessageConsumer consumer = sessionReceive.createConsumer((Destination)sessionReceive.createQueue(ADDRESS));
            for (int i = 0; i < 1200; ++i) {
                TextMessage message = (TextMessage)consumer.receive(5000L);
                Assertions.assertNotNull((Object)message);
                Assertions.assertEquals((Object)("OK!" + i), (Object)message.getText());
            }
            sessionReceive.close();
            Wait.assertTrue(() -> loggerHandler.findText(new String[]{"AMQ221046"}), (long)5000L, (long)10L);
        }
        finally {
            loggerHandler.close();
        }
    }

    @Test
    public void testDropMaxMessage() throws Exception {
        this.internalDropMaxMessge(false);
    }

    @Test
    public void testDropMaxMessageGlobal() throws Exception {
        this.internalDropMaxMessge(true);
    }

    private void internalDropMaxMessge(boolean global) throws Exception {
        this.clearDataRecreateServerDirs();
        Configuration config = this.createDefaultConfig(true);
        if (global) {
            config.setGlobalMaxMessages(10L);
        }
        this.server = this.createServer(true, config, 1024, 5120L, new HashMap<String, AddressSettings>());
        this.server.start();
        this.internalDropMaxMessages("AMQP", "CORE", this.server, global);
        this.internalDropMaxMessages("AMQP", "OPENWIRE", this.server, global);
        this.internalDropMaxMessages("AMQP", "AMQP", this.server, global);
        this.internalDropMaxMessages("CORE", "CORE", this.server, global);
        this.internalDropMaxMessages("CORE", "AMQP", this.server, global);
        this.internalDropMaxMessages("CORE", "OPENWIRE", this.server, global);
        this.internalDropMaxMessages("OPENWIRE", "OPENWIRE", this.server, global);
        this.internalDropMaxMessages("OPENWIRE", "AMQP", this.server, global);
        this.internalDropMaxMessages("OPENWIRE", "CORE", this.server, global);
    }

    private void internalDropMaxMessages(String protocolSend, String protocolReceive, ActiveMQServer server, boolean global) throws Exception {
        int MESSAGES = 20;
        logger.info("\n{}\nSending {}, Receiving {}\n", new Object[]{"*".repeat(80), protocolSend, protocolReceive, "*".repeat(80)});
        String ADDRESS = "FAIL_MAX_MESSAGES_" + protocolSend + "_" + protocolReceive;
        AddressSettings set = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
        if (global) {
            set.setMaxSizeBytes(-1L).setMaxSizeMessages(-1L);
        } else {
            set.setMaxSizeBytes(-1L).setMaxSizeMessages(10L);
        }
        server.getAddressSettingsRepository().addMatch(ADDRESS, (Object)set);
        server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
        server.createQueue(QueueConfiguration.of((String)ADDRESS).setRoutingType(RoutingType.ANYCAST));
        ConnectionFactory factorySend = CFUtil.createConnectionFactory(protocolSend, "tcp://localhost:61616");
        Connection connSend = factorySend.createConnection();
        ConnectionFactory factoryReceive = CFUtil.createConnectionFactory(protocolReceive, "tcp://localhost:61616");
        Connection connReceive = factoryReceive.createConnection();
        connReceive.start();
        this.runAfter(() -> ((Connection)connSend).close());
        this.runAfter(() -> ((Connection)connReceive).close());
        for (int repeat = 0; repeat < 5; ++repeat) {
            try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();){
                Session session = connSend.createSession(false, 1);
                MessageProducer producer = session.createProducer((Destination)session.createQueue(ADDRESS));
                producer.setDeliveryMode(2);
                for (int i = 0; i < 20; ++i) {
                    producer.send((Message)session.createTextMessage("OK!" + i));
                }
                session.close();
                if (repeat == 0) {
                    Assertions.assertTrue((boolean)loggerHandler.findText(new String[]{"AMQ222039"}));
                }
                Session sessionReceive = connReceive.createSession(false, 1);
                MessageConsumer consumer = sessionReceive.createConsumer((Destination)sessionReceive.createQueue(ADDRESS));
                for (int i = 0; i < 10; ++i) {
                    TextMessage message = (TextMessage)consumer.receive(5000L);
                    Assertions.assertNotNull((Object)message);
                }
                Assertions.assertNull((Object)consumer.receiveNoWait());
                sessionReceive.close();
                continue;
            }
        }
    }
}

