/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.client;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionContinuationMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={ParameterizedTestExtension.class})
public class InterruptedLargeMessageTest
extends LargeMessageTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    static final int RECEIVE_WAIT_TIME = 60000;
    private final int LARGE_MESSAGE_SIZE = 307200;
    protected ServerLocator locator;

    public InterruptedLargeMessageTest(StoreConfiguration.StoreType storeType) {
        super(storeType);
    }

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt();
        this.locator = this.createFactory(this.isNetty());
    }

    protected boolean isNetty() {
        return false;
    }

    @TestTemplate
    public void testInterruptLargeMessageSend() throws Exception {
        ClientSession session = null;
        LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt();
        ActiveMQServer server = this.createServer(true, this.isNetty());
        server.getConfiguration().getIncomingInterceptorClassNames().add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
        server.start();
        this.locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false);
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.ADDRESS));
        ClientProducer producer = session.createProducer(this.ADDRESS);
        ClientMessage clientFile = this.createLargeClientMessageStreaming(session, 307200L, true);
        clientFile.setExpiration(System.currentTimeMillis());
        producer.send((org.apache.activemq.artemis.api.core.Message)clientFile);
        Thread.sleep(500L);
        server.fail(false);
        ActiveMQTestBase.forceGC();
        server.start();
        server.stop();
        this.validateNoFilesOnLargeDir();
    }

    @TestTemplate
    public void testCloseConsumerDuringTransmission() throws Exception {
        ActiveMQServer server = this.createServer(true, this.isNetty());
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        server.start();
        this.locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).addIncomingInterceptor((Interceptor)new LargeMessageTestInterceptorIgnoreLastPacket());
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        ClientSession session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.ADDRESS));
        ClientProducer producer = session.createProducer(this.ADDRESS);
        ClientMessage clientFile = this.createLargeClientMessageStreaming(session, 307200L, true);
        producer.send((org.apache.activemq.artemis.api.core.Message)clientFile);
        session.commit();
        LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt();
        AtomicInteger unexpectedErrors = new AtomicInteger(0);
        AtomicInteger expectedErrors = new AtomicInteger(0);
        ClientConsumer cons = session.createConsumer(this.ADDRESS);
        session.start();
        Thread t = new Thread(() -> {
            try {
                logger.debug("Receiving message");
                ClientMessage msg = cons.receive(5000L);
                if (msg == null) {
                    System.err.println("Message not received");
                    unexpectedErrors.incrementAndGet();
                    return;
                }
                msg.checkCompletion();
            }
            catch (ActiveMQException e) {
                e.printStackTrace();
                expectedErrors.incrementAndGet();
            }
        });
        t.start();
        LargeMessageTestInterceptorIgnoreLastPacket.awaitInterrupt();
        cons.close();
        t.join();
        Assertions.assertEquals((int)0, (int)unexpectedErrors.get());
        Assertions.assertEquals((int)1, (int)expectedErrors.get());
        session.close();
        server.stop();
    }

    @TestTemplate
    public void testForcedInterruptUsingJMS() throws Exception {
        ActiveMQServer server = this.createServer(true, this.isNetty());
        server.start();
        SimpleString jmsAddress = SimpleString.of((String)"Test");
        server.createQueue(QueueConfiguration.of((SimpleString)jmsAddress).setRoutingType(RoutingType.ANYCAST));
        AtomicInteger unexpectedErrors = new AtomicInteger(0);
        AtomicInteger expectedErrors = new AtomicInteger(0);
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
        Connection connection = cf.createConnection();
        Session session = connection.createSession(0);
        connection.start();
        MessageConsumer consumer = session.createConsumer((Destination)session.createQueue(jmsAddress.toString()));
        Thread t = new Thread(() -> {
            try {
                logger.debug("Receiving message");
                Message msg = consumer.receive(5000L);
                if (msg == null) {
                    System.err.println("Message not received");
                    unexpectedErrors.incrementAndGet();
                    return;
                }
            }
            catch (JMSException e) {
                logger.debug("This exception was ok as it was expected", (Throwable)e);
                expectedErrors.incrementAndGet();
            }
            catch (Throwable e) {
                logger.warn("Captured unexpected exception", e);
                unexpectedErrors.incrementAndGet();
            }
        });
        t.start();
        t.interrupt();
        t.join();
        Assertions.assertEquals((int)0, (int)unexpectedErrors.get());
        Assertions.assertEquals((int)1, (int)expectedErrors.get());
        session.close();
        server.stop();
    }

    @TestTemplate
    public void testSendNonPersistentQueue() throws Exception {
        ClientSession session = null;
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        ActiveMQServer server = this.createServer(true, this.isNetty());
        server.start();
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.ADDRESS).setDurable(Boolean.valueOf(false)));
        ClientProducer producer = session.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; ++i) {
            ClientMessage clientFile = this.createLargeClientMessageStreaming(session, 307200L, true);
            producer.send((org.apache.activemq.artemis.api.core.Message)clientFile);
        }
        session.commit();
        session.close();
        session = sf.createSession(false, false);
        ClientConsumer cons = session.createConsumer(this.ADDRESS);
        session.start();
        for (int h = 0; h < 5; ++h) {
            for (int i = 0; i < 10; ++i) {
                ClientMessage clientMessage = cons.receive(5000L);
                Assertions.assertNotNull((Object)clientMessage);
                for (int countByte = 0; countByte < 307200; ++countByte) {
                    Assertions.assertEquals((byte)ActiveMQTestBase.getSamplebyte(countByte), (byte)clientMessage.getBodyBuffer().readByte());
                }
                clientMessage.acknowledge();
            }
            session.rollback();
        }
        server.fail(false);
        server.start();
        server.stop();
        this.validateNoFilesOnLargeDir();
    }

    @TestTemplate
    public void testSendPaging() throws Exception {
        ClientSession session = null;
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        ActiveMQServer server = this.createServer(true, this.createDefaultConfig(this.isNetty()), 10000, 20000L, new HashMap<String, AddressSettings>());
        server.start();
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.ADDRESS));
        server.getPagingManager().getPageStore(this.ADDRESS).startPaging();
        ClientProducer producer = session.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; ++i) {
            ClientMessage clientFile = this.createLargeClientMessageStreaming(session, 307200L, true);
            producer.send((org.apache.activemq.artemis.api.core.Message)clientFile);
        }
        session.commit();
        this.validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), 10);
        for (int h = 0; h < 5; ++h) {
            session.close();
            sf.close();
            server.stop();
            server.start();
            sf = this.createSessionFactory(this.locator);
            session = sf.createSession(false, false);
            ClientConsumer cons = session.createConsumer(this.ADDRESS);
            session.start();
            for (int i = 0; i < 10; ++i) {
                ClientMessage clientMessage = cons.receive(5000L);
                Assertions.assertNotNull((Object)clientMessage);
                for (int countByte = 0; countByte < 307200; ++countByte) {
                    Assertions.assertEquals((byte)ActiveMQTestBase.getSamplebyte(countByte), (byte)clientMessage.getBodyBuffer().readByte());
                }
                clientMessage.acknowledge();
            }
            if (h == 4) {
                session.commit();
            } else {
                session.rollback();
            }
            session.close();
            sf.close();
        }
        server.fail(false);
        server.start();
        this.validateNoFilesOnLargeDir();
    }

    @TestTemplate
    public void testSendPreparedXA() throws Exception {
        ClientMessage clientFile;
        int i;
        ClientSession session = null;
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        ActiveMQServer server = this.createServer(true, this.createDefaultConfig(this.isNetty()), 10000, 20000L, new HashMap<String, AddressSettings>());
        server.getConfiguration().getIncomingInterceptorClassNames().add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
        server.start();
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        session = sf.createSession(true, false, false);
        XidImpl xid1 = this.newXID();
        XidImpl xid2 = this.newXID();
        session.createQueue(QueueConfiguration.of((SimpleString)this.ADDRESS));
        ClientProducer producer = session.createProducer(this.ADDRESS);
        session.start((Xid)xid1, 0);
        for (i = 0; i < 10; ++i) {
            clientFile = this.createLargeClientMessageStreaming(session, 307200L, true);
            clientFile.putIntProperty("txid", 1);
            producer.send((org.apache.activemq.artemis.api.core.Message)clientFile);
        }
        session.end((Xid)xid1, 0x4000000);
        session.prepare((Xid)xid1);
        session.start((Xid)xid2, 0);
        for (i = 0; i < 10; ++i) {
            clientFile = this.createLargeClientMessageStreaming(session, 307200L, true);
            clientFile.putIntProperty("txid", 2);
            clientFile.putIntProperty("i", i);
            producer.send((org.apache.activemq.artemis.api.core.Message)clientFile);
        }
        session.end((Xid)xid2, 0x4000000);
        session.prepare((Xid)xid2);
        session.close();
        sf.close();
        server.fail(false);
        server.start();
        for (int start = 0; start < 2; ++start) {
            logger.debug("Start {}", (Object)start);
            sf = this.createSessionFactory(this.locator);
            if (start == 0) {
                session = sf.createSession(true, false, false);
                session.commit((Xid)xid1, false);
                session.close();
            }
            session = sf.createSession(false, false, false);
            ClientConsumer cons1 = session.createConsumer(this.ADDRESS);
            session.start();
            for (int i2 = 0; i2 < 10; ++i2) {
                logger.info("I = {}", (Object)i2);
                ClientMessage msg = cons1.receive(5000L);
                Assertions.assertNotNull((Object)msg);
                Assertions.assertEquals((int)1, (int)msg.getIntProperty("txid"));
                msg.acknowledge();
            }
            if (start == 1) {
                session.commit();
            } else {
                session.rollback();
            }
            session.close();
            sf.close();
            server.stop();
            server.start();
        }
        server.stop();
        this.validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), 10);
        server.start();
        sf = this.createSessionFactory(this.locator);
        session = sf.createSession(true, false, false);
        session.rollback((Xid)xid2);
        sf.close();
        server.stop();
        server.start();
        server.stop();
        this.validateNoFilesOnLargeDir();
    }

    @TestTemplate
    public void testRestartBeforeDelete() throws Exception {
        ClientSession session = null;
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        ActiveMQServer server = this.createServer(true, this.isNetty());
        server.start();
        QueueFactory original = server.getQueueFactory();
        final class NoPostACKQueueFactory
        implements QueueFactory {
            final StorageManager storageManager;
            final PostOffice postOffice;
            final ScheduledExecutorService scheduledExecutor;
            final HierarchicalRepository<AddressSettings> addressSettingsRepository;
            final ExecutorFactory execFactory;
            final ActiveMQServer server;

            NoPostACKQueueFactory(ActiveMQServer server, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutor, HierarchicalRepository<AddressSettings> addressSettingsRepository, ExecutorFactory execFactory) {
                this.storageManager = storageManager;
                this.postOffice = postOffice;
                this.scheduledExecutor = scheduledExecutor;
                this.addressSettingsRepository = addressSettingsRepository;
                this.execFactory = execFactory;
                this.server = server;
            }

            public Queue createQueueWith(QueueConfig config) {
                class NoPostACKQueue
                extends QueueImpl {
                    NoPostACKQueue(long id, SimpleString address, SimpleString name, Filter filter, SimpleString user, PageSubscription pageSubscription, boolean durable, boolean temporary, boolean autoCreated, ScheduledExecutorService scheduledExecutor, PostOffice postOffice, StorageManager storageManager, HierarchicalRepository<AddressSettings> addressSettingsRepository, ActiveMQServer server, ArtemisExecutor executor) {
                        super(id, address, name, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, null);
                    }

                    public void postAcknowledge(MessageReference ref, AckReason reason) {
                        logger.debug("Ignoring postACK on message {}", (Object)ref);
                    }

                    public void deliverScheduledMessages() {
                    }
                }
                return new NoPostACKQueue(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), this.scheduledExecutor, this.postOffice, this.storageManager, this.addressSettingsRepository, this.server, this.execFactory.getExecutor());
            }

            public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager, Filter filter) throws Exception {
                return new NoPostACKQueue(config.getId(), config.getAddress(), config.getName(), filter, config.getUser(), QueueFactoryImpl.getPageSubscription((QueueConfiguration)config, (PagingManager)pagingManager, (Filter)filter), config.isDurable(), config.isTemporary(), config.isAutoCreated(), this.scheduledExecutor, this.postOffice, this.storageManager, this.addressSettingsRepository, this.server, this.execFactory.getExecutor());
            }

            @Deprecated
            public Queue createQueue(long persistenceID, SimpleString address, SimpleString name, Filter filter, PageSubscription pageSubscription, SimpleString user, boolean durable, boolean temporary, boolean autoCreated) {
                return new NoPostACKQueue(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, this.scheduledExecutor, this.postOffice, this.storageManager, this.addressSettingsRepository, this.server, this.execFactory.getExecutor());
            }

            public void setPostOffice(PostOffice postOffice) {
            }
        }
        ((ActiveMQServerImpl)server).replaceQueueFactory((QueueFactory)new NoPostACKQueueFactory(server, server.getStorageManager(), server.getPostOffice(), server.getScheduledPool(), (HierarchicalRepository<AddressSettings>)server.getAddressSettingsRepository(), server.getExecutorFactory()));
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.ADDRESS));
        ClientProducer producer = session.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; ++i) {
            ClientMessage clientFile = this.createLargeClientMessageStreaming(session, 307200L, true);
            producer.send((org.apache.activemq.artemis.api.core.Message)clientFile);
        }
        session.commit();
        session.close();
        session = sf.createSession(false, false);
        ClientConsumer cons = session.createConsumer(this.ADDRESS);
        session.start();
        for (int i = 0; i < 10; ++i) {
            ClientMessage msg = cons.receive(5000L);
            Assertions.assertNotNull((Object)msg);
            msg.saveToOutputStream(new OutputStream(){

                @Override
                public void write(int b) throws IOException {
                }
            });
            msg.acknowledge();
            session.commit();
        }
        ((ActiveMQServerImpl)server).replaceQueueFactory(original);
        server.fail(false);
        server.start();
        server.stop();
        this.validateNoFilesOnLargeDir();
    }

    @TestTemplate
    public void testConsumeAfterRestart() throws Exception {
        ClientSession session = null;
        LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt();
        ActiveMQServer server = this.createServer(true, this.isNetty());
        server.start();
        QueueFactory original = server.getQueueFactory();
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory sf = this.createSessionFactory(this.locator);
        session = sf.createSession(false, true, true);
        session.createQueue(QueueConfiguration.of((SimpleString)this.ADDRESS));
        ClientProducer producer = session.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; ++i) {
            ClientMessage clientFile = this.createLargeClientMessageStreaming(session, 307200L, true);
            producer.send((org.apache.activemq.artemis.api.core.Message)clientFile);
        }
        session.commit();
        session.close();
        sf.close();
        server.stop();
        server.start();
        sf = this.createSessionFactory(this.locator);
        session = sf.createSession(false, false);
        ClientConsumer cons = session.createConsumer(this.ADDRESS);
        session.start();
        for (int i = 0; i < 10; ++i) {
            ClientMessage msg = cons.receive(5000L);
            Assertions.assertNotNull((Object)msg);
            msg.saveToOutputStream(new OutputStream(){

                @Override
                public void write(int b) throws IOException {
                }
            });
            msg.acknowledge();
            session.commit();
        }
        ((ActiveMQServerImpl)server).replaceQueueFactory(original);
        server.fail(false);
        server.start();
        server.stop();
        this.validateNoFilesOnLargeDir();
    }

    public static class LargeMessageTestInterceptorIgnoreLastPacket
    implements Interceptor {
        private static boolean intMessages = false;
        private static CountDownLatch latch = new CountDownLatch(1);

        public static void clearInterrupt() {
            intMessages = true;
            latch = new CountDownLatch(1);
        }

        public static void disableInterrupt() {
            intMessages = false;
        }

        public static void awaitInterrupt() throws Exception {
            latch.await();
        }

        public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
            SessionContinuationMessage msg;
            if (packet instanceof SessionContinuationMessage && !(msg = (SessionContinuationMessage)packet).isContinues() && intMessages) {
                logger.debug("Ignored a message");
                latch.countDown();
                return false;
            }
            return true;
        }
    }
}

