/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.plugin;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase;
import org.apache.activemq.artemis.tests.integration.federation.FederatedTestUtil;
import org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class FederationBrokerPluginTest
extends FederatedTestBase {
    private final Map<String, AtomicInteger> methodCalls = new ConcurrentHashMap<String, AtomicInteger>();
    private final MethodCalledVerifier verifier0 = new MethodCalledVerifier(this.methodCalls);

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.getServer(0).registerBrokerPlugin((ActiveMQServerBasePlugin)this.verifier0);
    }

    @Test
    public void testFederationStreamStartStop() throws Exception {
        String address = this.getName();
        FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
        this.getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
        this.getServer(0).getFederationManager().deploy();
        this.verifier0.validatePluginMethodsEquals(1, 5000L, 500L, "federationStreamStarted");
        this.getServer(0).getFederationManager().stop();
        this.verifier0.validatePluginMethodsEquals(1, 5000L, 500L, "federationStreamStopped");
    }

    @Test
    public void testFederationStreamConsumerAddressUpstream() throws Exception {
        String address = this.getName();
        FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
        this.getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
        this.getServer(0).getFederationManager().deploy();
        this.testFederationStreamConsumerAddress(address);
    }

    @Test
    public void testFederationStreamConsumerAddressDownstream() throws Exception {
        String address = this.getName();
        FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server0", address, "server1");
        this.getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration);
        this.getServer(1).getFederationManager().deploy();
        this.testFederationStreamConsumerAddress(address);
    }

    private void testFederationStreamConsumerAddress(String address) throws Exception {
        ConnectionFactory cf1 = this.getCF(1);
        ConnectionFactory cf0 = this.getCF(0);
        try (Connection connection1 = cf1.createConnection();
             Connection connection0 = cf0.createConnection();){
            connection1.start();
            connection0.start();
            Session session0 = connection0.createSession();
            Session session1 = connection1.createSession();
            Topic topic0 = session0.createTopic(address);
            Topic topic1 = session1.createTopic(address);
            MessageConsumer consumer0 = session0.createConsumer((Destination)topic0);
            MessageProducer producer1 = session1.createProducer((Destination)topic1);
            Assertions.assertTrue((boolean)Wait.waitFor(() -> this.getServer(1).getPostOffice().getBindingsForAddress(SimpleString.of((String)address)).getBindings().size() == 1, (long)5000L, (long)500L));
            this.verifier0.validatePluginMethodsEquals(1, 5000L, 500L, "beforeCreateFederatedQueueConsumer", "afterCreateFederatedQueueConsumer", "federatedAddressConditionalCreateConsumer");
            this.verifier0.validatePluginMethodsEquals(0, 5000L, 500L, "beforeCloseFederatedQueueConsumer", "afterCloseFederatedQueueConsumer");
            producer1.send((Message)session1.createTextMessage("hello"));
            Assertions.assertNotNull((Object)consumer0.receive(5000L));
            consumer0.close();
            this.verifier0.validatePluginMethodsEquals(1, 5000L, 500L, "beforeCloseFederatedQueueConsumer", "afterCloseFederatedQueueConsumer", "beforeFederatedQueueConsumerMessageHandled", "afterFederatedQueueConsumerMessageHandled");
        }
    }

    @Test
    public void testFederationStreamConsumerQueueUpstream() throws Exception {
        String queueName = this.getName();
        FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
        this.getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
        this.getServer(0).getFederationManager().deploy();
        this.testFederationStreamConsumerQueue(queueName);
    }

    @Test
    public void testFederationStreamConsumerQueueDownstream() throws Exception {
        String queueName = this.getName();
        FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", queueName, "server1");
        this.getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration);
        this.getServer(1).getFederationManager().deploy();
        this.testFederationStreamConsumerQueue(queueName);
    }

    private void testFederationStreamConsumerQueue(String queueName) throws Exception {
        ConnectionFactory cf1 = this.getCF(1);
        ConnectionFactory cf0 = this.getCF(0);
        try (Connection connection1 = cf1.createConnection();
             Connection connection0 = cf0.createConnection();){
            connection0.start();
            connection1.start();
            Session session0 = connection0.createSession();
            Session session1 = connection1.createSession();
            jakarta.jms.Queue queue0 = session0.createQueue(queueName);
            jakarta.jms.Queue queue1 = session1.createQueue(queueName);
            MessageProducer producer1 = session1.createProducer((Destination)queue1);
            producer1.send((Message)session1.createTextMessage("hello"));
            MessageConsumer consumer0 = session0.createConsumer((Destination)queue0);
            Assertions.assertNotNull((Object)consumer0.receive(1000L));
            this.verifier0.validatePluginMethodsEquals(1, 5000L, 500L, "beforeCreateFederatedQueueConsumer", "afterCreateFederatedQueueConsumer", "federatedQueueConditionalCreateConsumer");
            this.verifier0.validatePluginMethodsEquals(0, 5000L, 500L, "beforeCloseFederatedQueueConsumer", "afterCloseFederatedQueueConsumer");
            consumer0.close();
            this.verifier0.validatePluginMethodsEquals(1, 5000L, 500L, "beforeCloseFederatedQueueConsumer", "afterCloseFederatedQueueConsumer", "beforeFederatedQueueConsumerMessageHandled", "afterFederatedQueueConsumerMessageHandled");
        }
    }

    @Test
    public void testFederatedAddressConditional() throws Exception {
        String address = this.getName();
        this.getServer(0).registerBrokerPlugin((ActiveMQServerBasePlugin)new ActiveMQServerFederationPlugin(){

            public boolean federatedAddressConditionalCreateConsumer(Queue queue) {
                return false;
            }
        });
        FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
        this.getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
        this.getServer(0).getFederationManager().deploy();
        ConnectionFactory cf1 = this.getCF(1);
        ConnectionFactory cf0 = this.getCF(0);
        try (Connection connection1 = cf1.createConnection();
             Connection connection0 = cf0.createConnection();){
            connection1.start();
            connection0.start();
            Session session0 = connection0.createSession();
            Session session1 = connection1.createSession();
            Topic topic0 = session0.createTopic(address);
            Topic topic1 = session1.createTopic(address);
            MessageConsumer consumer0 = session0.createConsumer((Destination)topic0);
            MessageProducer producer1 = session1.createProducer((Destination)topic1);
            Assertions.assertFalse((boolean)Wait.waitFor(() -> this.getServer(1).getPostOffice().getBindingsForAddress(SimpleString.of((String)address)).getBindings().size() > 0, (long)2000L, (long)500L));
            this.verifier0.validatePluginMethodsEquals(1, 5000L, 500L, "federatedAddressConditionalCreateConsumer");
            this.verifier0.validatePluginMethodsEquals(0, 5000L, 500L, "beforeCreateFederatedQueueConsumer", "afterCreateFederatedQueueConsumer");
            producer1.send((Message)session1.createTextMessage("hello"));
            Assertions.assertNull((Object)consumer0.receive(1000L));
            consumer0.close();
            this.verifier0.validatePluginMethodsEquals(0, 5000L, 500L, "beforeCloseFederatedQueueConsumer", "afterCloseFederatedQueueConsumer");
        }
    }

    @Test
    public void testFederatedQueueConditional() throws Exception {
        String queueName = this.getName();
        this.getServer(0).registerBrokerPlugin((ActiveMQServerBasePlugin)new ActiveMQServerFederationPlugin(){

            public boolean federatedQueueConditionalCreateConsumer(ServerConsumer consumer) {
                return false;
            }
        });
        FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
        this.getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
        this.getServer(0).getFederationManager().deploy();
        ConnectionFactory cf1 = this.getCF(1);
        ConnectionFactory cf0 = this.getCF(0);
        try (Connection connection1 = cf1.createConnection();
             Connection connection0 = cf0.createConnection();){
            connection0.start();
            connection1.start();
            Session session0 = connection0.createSession();
            Session session1 = connection1.createSession();
            jakarta.jms.Queue queue0 = session0.createQueue(queueName);
            jakarta.jms.Queue queue1 = session1.createQueue(queueName);
            MessageProducer producer1 = session1.createProducer((Destination)queue1);
            producer1.send((Message)session1.createTextMessage("hello"));
            MessageConsumer consumer0 = session0.createConsumer((Destination)queue0);
            Assertions.assertNull((Object)consumer0.receive(1000L));
            this.verifier0.validatePluginMethodsEquals(1, 5000L, 500L, "federatedQueueConditionalCreateConsumer");
            this.verifier0.validatePluginMethodsEquals(0, 5000L, 500L, "beforeCreateFederatedQueueConsumer", "afterCreateFederatedQueueConsumer");
            consumer0.close();
            this.verifier0.validatePluginMethodsEquals(0, 5000L, 500L, "beforeCloseFederatedQueueConsumer", "afterCloseFederatedQueueConsumer");
        }
    }

    protected ConnectionFactory getCF(int i) throws Exception {
        return new ActiveMQConnectionFactory("vm://" + i);
    }
}

