/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class AMQPMirrorFastACKTest
extends AmqpClientTestSupport {
    private static final String SLOW_SERVER_NAME = "slow";
    private static final int SLOW_SERVER_PORT = 5673;
    private static final int ENCODE_DELAY = 10;
    private ActiveMQServer slowServer;

    @Override
    protected String getConfiguredProtocols() {
        return "AMQP,CORE,OPENWIRE";
    }

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.slowServer = this.createSlowServer();
    }

    @Override
    @AfterEach
    public void tearDown() throws Exception {
        try {
            if (this.slowServer != null) {
                this.slowServer.stop();
            }
        }
        finally {
            super.tearDown();
        }
    }

    @Test
    public void testMirrorTargetFastACKCore() throws Exception {
        this.doTestMirrorTargetFastACK("CORE");
    }

    @Test
    public void testMirrorTargetFastACKAMQP() throws Exception {
        this.doTestMirrorTargetFastACK("AMQP");
    }

    private void doTestMirrorTargetFastACK(String protocol) throws Exception {
        int NUMBER_OF_MESSAGES = 10;
        CountDownLatch done = new CountDownLatch(10);
        AMQPMirrorBrokerConnectionElement replication = this.configureMirrorTowardsSlow(this.server);
        this.slowServer.start();
        this.server.start();
        this.waitForServerToStart(this.slowServer);
        this.waitForServerToStart(this.server);
        this.server.addAddressInfo(new AddressInfo(this.getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
        this.server.createQueue(QueueConfiguration.of((String)this.getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(this.getQueueName()).setAutoCreated(Boolean.valueOf(false)));
        ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672");
        try (Connection connection = factory.createConnection();){
            Session consumerSession = connection.createSession(false, 2);
            MessageConsumer consumer = consumerSession.createConsumer((Destination)consumerSession.createQueue(this.getQueueName()));
            Session producerSession = connection.createSession(false, 2);
            MessageProducer producer = producerSession.createProducer((Destination)producerSession.createQueue(this.getQueueName()));
            connection.start();
            consumer.setMessageListener(message -> {
                try {
                    message.acknowledge();
                    done.countDown();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            producer.setDeliveryMode(2);
            for (int i = 0; i < 10; ++i) {
                producer.send((Message)producerSession.createTextMessage("i=" + i));
            }
            Assertions.assertTrue((boolean)done.await(5000L, TimeUnit.MILLISECONDS));
        }
        Queue snf = this.server.locateQueue(replication.getMirrorSNF());
        Queue queue = this.slowServer.locateQueue(this.getQueueName());
        Wait.waitFor(() -> snf.getMessageCount() == 0L && snf.getMessagesAdded() > 10L);
        Wait.assertTrue((String)("Expected mirrored target queue " + this.getQueueName() + " to be empty"), () -> queue.getMessageCount() == 0L && queue.getMessagesAdded() == 10L);
    }

    @Override
    protected ActiveMQServer createServer() throws Exception {
        return this.createServer(5672, false);
    }

    private AMQPMirrorBrokerConnectionElement configureMirrorTowardsSlow(ActiveMQServer source) {
        AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:5673").setReconnectAttempts(-1).setRetryInterval(100);
        AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true);
        connection.addElement((AMQPBrokerConnectionElement)replication);
        source.getConfiguration().addAMQPConnection(connection);
        return replication;
    }

    private ActiveMQServer createSlowServer() throws Exception {
        ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
        ActiveMQServerImpl server = new ActiveMQServerImpl((Configuration)this.createBasicConfig(5673), this.mBeanServer, (ActiveMQSecurityManager)securityManager){

            protected StorageManager createStorageManager() {
                return AMQPMirrorFastACKTest.this.createStorageManager(this.getConfiguration(), this.getCriticalAnalyzer(), this.executorFactory, this.scheduledPool, this.ioExecutorFactory, this.ioCriticalErrorListener);
            }
        };
        server.getConfiguration().setName(SLOW_SERVER_NAME);
        server.getConfiguration().getAcceptorConfigurations().clear();
        server.getConfiguration().getAcceptorConfigurations().add(this.addAcceptorConfiguration(this.slowServer, 5673));
        server.getConfiguration().setJMXManagementEnabled(true);
        server.getConfiguration().setMessageExpiryScanPeriod(100L);
        this.configureAddressPolicy((ActiveMQServer)server);
        this.configureBrokerSecurity((ActiveMQServer)server);
        return server;
    }

    private StorageManager createStorageManager(Configuration configuration, CriticalAnalyzer criticalAnalyzer, ExecutorFactory executorFactory, ScheduledExecutorService scheduledPool, ExecutorFactory ioExecutorFactory, IOCriticalErrorListener ioCriticalErrorListener) {
        return new JournalStorageManager(configuration, criticalAnalyzer, executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener){

            protected Journal createMessageJournal(Configuration config, IOCriticalErrorListener criticalErrorListener, int fileSize) {
                return new JournalImpl(this.ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), this.journalFF, "activemq-data", "amq", this.journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()){

                    public void appendAddRecordTransactional(long txID, long id, byte recordType, Persister persister, Object record) throws Exception {
                        super.appendAddRecordTransactional(txID, id, recordType, (Persister)(record instanceof AMQPStandardMessage ? new SlowMessagePersister(persister) : persister), record);
                    }
                };
            }
        };
    }

    static class SlowMessagePersister<T>
    implements Persister<T> {
        private final Persister<T> delegate;

        SlowMessagePersister(Persister<T> delegate) {
            this.delegate = delegate;
        }

        public byte getID() {
            return this.delegate.getID();
        }

        public int getEncodeSize(T record) {
            return this.delegate.getEncodeSize(record);
        }

        public void encode(ActiveMQBuffer buffer, T record) {
            try {
                Thread.sleep(10L);
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.delegate.encode(buffer, record);
        }

        public T decode(ActiveMQBuffer buffer, T record, CoreMessageObjectPools pool) {
            return (T)this.delegate.decode(buffer, record, pool);
        }
    }
}

