/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.plugin;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.integration.plugin.ConfigurationVerifier;
import org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CorePluginTest
extends JMSTestBase {
    private Queue queue;
    private final Map<String, AtomicInteger> methodCalls = new ConcurrentHashMap<String, AtomicInteger>();
    private final MethodCalledVerifier verifier = new MethodCalledVerifier(this.methodCalls);
    private final ConfigurationVerifier configurationVerifier = new ConfigurationVerifier();
    public static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();

    @Override
    protected Configuration createDefaultConfig(boolean netty) throws Exception {
        Configuration config = super.createDefaultConfig(netty);
        config.registerBrokerPlugin((ActiveMQServerBasePlugin)this.verifier);
        HashMap<String, String> props = new HashMap<String, String>(1);
        props.put("property1", "val_1");
        this.configurationVerifier.init(props);
        config.registerBrokerPlugin((ActiveMQServerBasePlugin)this.configurationVerifier);
        config.setMessageExpiryScanPeriod(0L);
        return config;
    }

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.queue = this.createQueue("queue1");
    }

    @Test
    public void testSendReceive() throws Exception {
        AckPluginVerifier ackVerifier = new AckPluginVerifier((consumer, reason) -> {
            CorePluginTest.assertEquals((Object)AckReason.NORMAL, (Object)reason);
            CorePluginTest.assertNotNull((Object)consumer);
        });
        this.server.registerBrokerPlugin((ActiveMQServerBasePlugin)ackVerifier);
        this.conn = this.cf.createConnection();
        this.conn.start();
        Session sess = this.conn.createSession(false, 1);
        MessageProducer prod = sess.createProducer((Destination)this.queue);
        MessageConsumer cons = sess.createConsumer((Destination)this.queue);
        TextMessage msg1 = sess.createTextMessage("test");
        prod.send((Message)msg1);
        TextMessage received1 = (TextMessage)cons.receive(1000L);
        CorePluginTest.assertNotNull((Object)received1);
        this.conn.close();
        this.verifier.validatePluginMethodsEquals(0, "messageExpired", "beforeDeployBridge", "afterDeployBridge", "beforeDestroyQueue", "afterDestroyQueue", "beforeUpdateAddress", "afterUpdateAddress", "beforeRemoveAddress", "afterRemoveAddress", "beforeRemoveBinding", "afterRemoveBinding");
        this.verifier.validatePluginMethodsEquals(2, "beforeCreateSession", "afterCreateSession", "beforeCloseSession", "afterCloseSession", "beforeAddAddress", "afterAddAddress");
        this.verifier.validatePluginMethodsAtLeast(1, "beforeMessageRoute", "afterMessageRoute");
        CorePluginTest.assertEquals((String)"configurationVerifier is invoked", (long)1L, (long)this.configurationVerifier.afterSendCounter.get());
        CorePluginTest.assertEquals((String)"configurationVerifier is invoked", (long)1L, (long)this.configurationVerifier.successRoutedCounter.get());
        CorePluginTest.assertEquals((String)"configurationVerifier config set", (Object)"val_1", (Object)this.configurationVerifier.value1);
        CorePluginTest.assertFalse((String)ackVerifier.getErrorMsg(), (boolean)ackVerifier.hasError());
    }

    @Test
    public void testDestroyQueue() throws Exception {
        this.conn = this.cf.createConnection();
        this.conn.start();
        Session sess = this.conn.createSession(false, 1);
        sess.createProducer((Destination)this.queue);
        this.conn.close();
        this.server.destroyQueue(new SimpleString(this.queue.getQueueName()));
        this.verifier.validatePluginMethodsEquals(2, "beforeAddAddress", "afterAddAddress");
        this.verifier.validatePluginMethodsEquals(1, "beforeCreateQueue", "afterCreateQueue", "beforeDestroyQueue", "afterDestroyQueue", "beforeRemoveBinding", "afterRemoveBinding");
    }

    @Test
    public void testAutoCreateQueue() throws Exception {
        this.conn = this.cf.createConnection();
        this.conn.start();
        Session sess = this.conn.createSession(false, 1);
        Queue autoCreatedQueue = sess.createQueue("autoCreatedQueue");
        sess.createConsumer((Destination)autoCreatedQueue);
        this.conn.close();
        this.verifier.validatePluginMethodsEquals(1, "beforeDestroyQueue", "afterDestroyQueue", "beforeRemoveAddress", "afterRemoveAddress");
        this.verifier.validatePluginMethodsEquals(3, "beforeAddAddress", "afterAddAddress");
        this.verifier.validatePluginMethodsEquals(2, "beforeCreateQueue", "afterCreateQueue", "beforeAddBinding", "afterAddBinding");
    }

    @Test
    public void testAutoCreateTopic() throws Exception {
        this.conn = this.cf.createConnection();
        this.conn.start();
        Session sess = this.conn.createSession(false, 1);
        Topic autoCreatedTopic = sess.createTopic("autoCreatedTopic");
        sess.createConsumer((Destination)autoCreatedTopic);
        this.conn.close();
        this.verifier.validatePluginMethodsEquals(1, "beforeDestroyQueue", "afterDestroyQueue", "beforeRemoveAddress", "afterRemoveAddress", "beforeRemoveBinding", "afterRemoveBinding");
        this.verifier.validatePluginMethodsEquals(3, "beforeAddAddress", "afterAddAddress");
        this.verifier.validatePluginMethodsEquals(2, "beforeCreateQueue", "afterCreateQueue", "beforeAddBinding", "afterAddBinding");
    }

    @Test
    public void testMessageExpireServer() throws Exception {
        AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> CorePluginTest.assertEquals((Object)AckReason.EXPIRED, (Object)reason));
        this.server.registerBrokerPlugin((ActiveMQServerBasePlugin)expiredVerifier);
        this.conn = this.cf.createConnection();
        this.conn.setClientID("test");
        this.conn.start();
        Session sess = this.conn.createSession(false, 1);
        MessageProducer prod = sess.createProducer((Destination)this.queue);
        prod.setTimeToLive(1L);
        MessageConsumer cons = sess.createConsumer((Destination)this.queue);
        Thread.sleep(100L);
        TextMessage msg1 = sess.createTextMessage("test");
        prod.send((Message)msg1);
        Thread.sleep(100L);
        CorePluginTest.assertNull((Object)cons.receive(100L));
        this.conn.close();
        this.verifier.validatePluginMethodsEquals(0, "beforeDeployBridge", "afterDeployBridge", "beforeRemoveAddress", "afterRemoveAddress", "beforeRemoveBinding", "afterRemoveBinding");
        this.verifier.validatePluginMethodsAtLeast(1, "afterCreateConnection", "afterDestroyConnection", "beforeCreateConsumer", "afterCreateConsumer", "beforeCloseConsumer", "afterCloseConsumer", "beforeSessionMetadataAdded", "afterSessionMetadataAdded", "beforeAddAddress", "afterAddAddress", "beforeCreateQueue", "afterCreateQueue", "messageAcknowledged", "beforeSend", "afterSend", "beforeMessageRoute", "afterMessageRoute", "messageExpired", "beforeAddAddress", "afterAddAddress", "beforeAddBinding", "afterAddBinding");
        this.verifier.validatePluginMethodsEquals(2, "beforeCreateSession", "afterCreateSession", "beforeCloseSession", "afterCloseSession");
        CorePluginTest.assertFalse((String)expiredVerifier.getErrorMsg(), (boolean)expiredVerifier.hasError());
    }

    @Test
    public void testMessageExpireClient() throws Exception {
        AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> CorePluginTest.assertEquals((Object)AckReason.EXPIRED, (Object)reason));
        this.server.registerBrokerPlugin((ActiveMQServerBasePlugin)expiredVerifier);
        this.conn = this.cf.createConnection();
        this.conn.start();
        Session sess = this.conn.createSession(false, 1);
        MessageProducer prod = sess.createProducer((Destination)this.queue);
        prod.setTimeToLive(500L);
        MessageConsumer cons = sess.createConsumer((Destination)this.queue);
        for (int i = 0; i < 10; ++i) {
            TextMessage msg1 = sess.createTextMessage("test");
            prod.send((Message)msg1);
        }
        Thread.sleep(500L);
        CorePluginTest.assertNull((Object)cons.receive(500L));
        this.conn.close();
        this.verifier.validatePluginMethodsEquals(0, "beforeDeployBridge", "afterDeployBridge", "beforeRemoveAddress", "afterRemoveAddress", "beforeRemoveBinding", "afterRemoveBinding");
        this.verifier.validatePluginMethodsAtLeast(1, "afterCreateConnection", "afterDestroyConnection", "beforeCreateConsumer", "afterCreateConsumer", "beforeCloseConsumer", "afterCloseConsumer", "beforeCreateQueue", "afterCreateQueue", "messageAcknowledged", "beforeSend", "afterSend", "beforeMessageRoute", "afterMessageRoute", "beforeDeliver", "afterDeliver", "messageExpired", "beforeAddAddress", "afterAddAddress", "beforeAddBinding", "afterAddBinding");
        this.verifier.validatePluginMethodsEquals(2, "beforeCreateSession", "afterCreateSession", "beforeCloseSession", "afterCloseSession");
        CorePluginTest.assertFalse((String)expiredVerifier.getErrorMsg(), (boolean)expiredVerifier.hasError());
    }

    @Test
    public void testSimpleBridge() throws Exception {
        ClientMessage message;
        int i;
        this.server.stop();
        HashMap server0Params = new HashMap();
        ActiveMQServer server0 = this.createClusteredServerWithParams(false, 0, false, server0Params);
        server0.registerBrokerPlugin((ActiveMQServerBasePlugin)this.verifier);
        HashMap<String, Integer> server1Params = new HashMap<String, Integer>();
        server1Params.put("serverId", 1);
        ActiveMQServer server1 = this.createClusteredServerWithParams(false, 1, false, server1Params);
        String testAddress = "testAddress";
        String queueName0 = "queue0";
        String forwardAddress = "forwardAddress";
        String queueName1 = "queue1";
        TransportConfiguration server0tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server0Params);
        TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params);
        HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
        connectors.put(server1tc.getName(), server1tc);
        server0.getConfiguration().setConnectorConfigurations(connectors);
        int messageSize = 1024;
        int numMessages = 10;
        ArrayList<String> connectorConfig = new ArrayList<String>();
        connectorConfig.add(server1tc.getName());
        BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(5120).setStaticConnectors(connectorConfig);
        ArrayList<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
        bridgeConfigs.add(bridgeConfiguration);
        server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
        QueueConfiguration queueConfig0 = new QueueConfiguration("queue0").setAddress("testAddress");
        ArrayList<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
        queueConfigs0.add(queueConfig0);
        server0.getConfiguration().setQueueConfigs(queueConfigs0);
        QueueConfiguration queueConfig1 = new QueueConfiguration("queue1").setAddress("forwardAddress");
        ArrayList<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
        queueConfigs1.add(queueConfig1);
        server1.getConfiguration().setQueueConfigs(queueConfigs1);
        server1.start();
        server0.start();
        ServerLocator locator = this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA((TransportConfiguration[])new TransportConfiguration[]{server0tc, server1tc}));
        ClientSessionFactory sf0 = this.addSessionFactory(locator.createSessionFactory(server0tc));
        ClientSessionFactory sf1 = this.addSessionFactory(locator.createSessionFactory(server1tc));
        ClientSession session0 = sf0.createSession(false, true, true);
        ClientSession session1 = sf1.createSession(false, true, true);
        ClientProducer producer0 = session0.createProducer(new SimpleString("testAddress"));
        ClientConsumer consumer1 = session1.createConsumer("queue1");
        session1.start();
        byte[] bytes = new byte[1024];
        SimpleString propKey = new SimpleString("testkey");
        for (i = 0; i < 10; ++i) {
            message = session0.createMessage(true);
            message.putIntProperty(propKey, i);
            message.getBodyBuffer().writeBytes(bytes);
            producer0.send((org.apache.activemq.artemis.api.core.Message)message);
        }
        for (i = 0; i < 10; ++i) {
            message = consumer1.receive(5000L);
            Assert.assertNotNull((Object)message);
            Assert.assertEquals((Object)i, (Object)message.getObjectProperty(propKey));
            message.acknowledge();
        }
        Assert.assertNull((Object)consumer1.receiveImmediate());
        session0.close();
        session1.close();
        sf0.close();
        sf1.close();
        CorePluginTest.assertEquals((long)1L, (long)server0.getClusterManager().getBridges().size());
        BridgeMetrics bridgeMetrics = ((Bridge)server0.getClusterManager().getBridges().get("bridge1")).getMetrics();
        CorePluginTest.assertEquals((long)10L, (long)bridgeMetrics.getMessagesPendingAcknowledgement());
        CorePluginTest.assertEquals((long)10L, (long)bridgeMetrics.getMessagesAcknowledged());
        this.verifier.validatePluginMethodsEquals(1, "beforeDeployBridge", "afterDeployBridge");
        this.verifier.validatePluginMethodsEquals(10, "beforeDeliverBridge", "afterDeliverBridge", "afterAcknowledgeBridge");
        server0.stop();
        server1.stop();
    }

    @Test
    public void testUpdateAddress() throws Exception {
        this.server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString((String)"test"), RoutingType.ANYCAST));
        this.server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString((String)"test"), RoutingType.MULTICAST));
        this.verifier.validatePluginMethodsEquals(1, "beforeUpdateAddress", "afterUpdateAddress");
    }

    private class AckPluginVerifier
    implements ActiveMQServerPlugin {
        private BiConsumer<ServerConsumer, AckReason> assertion;
        private Throwable error;

        AckPluginVerifier(BiConsumer<ServerConsumer, AckReason> assertion) {
            this.assertion = assertion;
        }

        public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) {
            try {
                this.assertion.accept(consumer, reason);
            }
            catch (Throwable e) {
                this.error = e;
                throw e;
            }
        }

        private boolean hasError() {
            return this.error != null;
        }

        private String getErrorMsg() {
            return this.hasError() ? this.error.getMessage() : "";
        }
    }
}

