/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.paging;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PagingCounterTest
extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private ActiveMQServer server;
    private ServerLocator sl;
    private AssertionLoggerHandler loggerHandler;

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.server = this.newActiveMQServer();
        this.server.start();
        this.sl = this.createInVMNonHALocator();
        this.loggerHandler = new AssertionLoggerHandler();
    }

    @AfterEach
    public void checkLoggerEnd() throws Exception {
        if (this.loggerHandler != null) {
            try {
                Assertions.assertFalse((boolean)this.loggerHandler.findText(new String[]{"222214"}));
                Assertions.assertFalse((boolean)this.loggerHandler.findText(new String[]{"222215"}));
            }
            finally {
                this.loggerHandler.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCounter() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.sl);
        ClientSession session = sf.createSession();
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of((String)"A1"), RoutingType.ANYCAST));
            Queue queue = this.server.createQueue(QueueConfiguration.of((String)"A1").setRoutingType(RoutingType.ANYCAST));
            PageSubscriptionCounter counter = this.locateCounter(queue);
            StorageManager storage = this.server.getStorageManager();
            TransactionImpl tx = new TransactionImpl(this.server.getStorageManager());
            counter.increment((Transaction)tx, 1, 1000L);
            Wait.assertEquals((long)0L, () -> ((PageSubscriptionCounter)counter).getValue());
            Wait.assertEquals((long)0L, () -> ((PageSubscriptionCounter)counter).getPersistentSize());
            tx.commit();
            Wait.assertEquals((long)1L, () -> ((PageSubscriptionCounter)counter).getValue());
            Wait.assertEquals((long)1000L, () -> ((PageSubscriptionCounter)counter).getPersistentSize());
        }
        finally {
            sf.close();
            session.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultiThreadUpdates() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.sl);
        ClientSession session = sf.createSession();
        AtomicInteger errors = new AtomicInteger(0);
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of((String)"A1"), RoutingType.ANYCAST));
            Queue queue = this.server.createQueue(QueueConfiguration.of((SimpleString)SimpleString.of((String)"A1")).setRoutingType(RoutingType.ANYCAST));
            PageSubscriptionCounter counter = this.locateCounter(queue);
            int THREADS = 10;
            CyclicBarrier flagStart = new CyclicBarrier(10);
            CountDownLatch done = new CountDownLatch(10);
            int BUMPS = 2000;
            Assertions.assertEquals((long)0L, (long)counter.getValue());
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            this.runAfter(executorService::shutdownNow);
            for (int i = 0; i < 10; ++i) {
                executorService.execute(() -> {
                    try {
                        flagStart.await(10L, TimeUnit.SECONDS);
                        for (int repeat = 0; repeat < 2000; ++repeat) {
                            counter.increment(null, 2, 1L);
                            TransactionImpl tx = new TransactionImpl(this.server.getStorageManager());
                            counter.increment((Transaction)tx, 1, 1L);
                            tx.commit();
                            counter.increment(null, -1, -1L);
                            tx = new TransactionImpl(this.server.getStorageManager());
                            counter.increment((Transaction)tx, -1, -1L);
                            tx.commit();
                        }
                    }
                    catch (Exception e) {
                        logger.warn(e.getMessage(), (Throwable)e);
                        errors.incrementAndGet();
                    }
                    finally {
                        done.countDown();
                    }
                });
            }
            done.await(1L, TimeUnit.MINUTES);
            Wait.assertEquals((Long)20000L, () -> ((PageSubscriptionCounter)counter).getValue(), (long)5000L, (long)100L);
            this.server.stop();
            this.server.setRebuildCounters(false);
            this.server.start();
            queue = this.server.locateQueue("A1");
            PageSubscriptionCounter counterAfterRestart = this.locateCounter(queue);
            Wait.assertEquals((Long)20000L, () -> ((PageSubscriptionCounter)counterAfterRestart).getValue(), (long)5000L, (long)100L);
        }
        finally {
            sf.close();
            session.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultiThreadCounter() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.sl);
        ClientSession session = sf.createSession();
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of((String)"A1"), RoutingType.ANYCAST));
            Queue queue = this.server.createQueue(QueueConfiguration.of((SimpleString)SimpleString.of((String)"A1")).setRoutingType(RoutingType.ANYCAST));
            PageSubscriptionCounter counter = this.locateCounter(queue);
            int THREADS = 10;
            CyclicBarrier flagStart = new CyclicBarrier(10);
            CountDownLatch done = new CountDownLatch(10);
            int BUMPS = 2000;
            Assertions.assertEquals((long)0L, (long)counter.getValue());
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            this.runAfter(executorService::shutdownNow);
            for (int i = 0; i < 10; ++i) {
                executorService.execute(() -> {
                    try {
                        flagStart.await(10L, TimeUnit.SECONDS);
                        for (int repeat = 0; repeat < 2000; ++repeat) {
                            counter.increment(null, 1, 1L);
                            TransactionImpl tx = new TransactionImpl(this.server.getStorageManager());
                            counter.increment((Transaction)tx, 1, 1L);
                            tx.commit();
                        }
                    }
                    catch (Exception e) {
                        logger.warn(e.getMessage(), (Throwable)e);
                    }
                    finally {
                        done.countDown();
                    }
                });
            }
            done.await(1L, TimeUnit.MINUTES);
            Wait.assertEquals((Long)40000L, () -> ((PageSubscriptionCounter)counter).getValue(), (long)5000L, (long)100L);
            this.server.stop();
            this.server.setRebuildCounters(false);
            this.server.start();
            queue = this.server.locateQueue("A1");
            PageSubscriptionCounter counterAfterRestart = this.locateCounter(queue);
            Wait.assertEquals((Long)40000L, () -> ((PageSubscriptionCounter)counterAfterRestart).getValue(), (long)5000L, (long)100L);
            Assertions.assertEquals((long)40000L, (long)counterAfterRestart.getValue());
        }
        finally {
            sf.close();
            session.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCleanupCounter() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.sl);
        ClientSession session = sf.createSession();
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of((String)"A1"), RoutingType.ANYCAST));
            Queue queue = this.server.createQueue(QueueConfiguration.of((SimpleString)SimpleString.of((String)"A1")).setRoutingType(RoutingType.ANYCAST));
            PageSubscriptionCounter counter = this.locateCounter(queue);
            StorageManager storage = this.server.getStorageManager();
            TransactionImpl tx = new TransactionImpl(this.server.getStorageManager());
            for (int i = 0; i < 2100; ++i) {
                counter.increment((Transaction)tx, 1, 1000L);
                if (i % 200 != 0) continue;
                tx.commit();
                storage.waitOnOperations();
                Wait.assertEquals((long)(i + 1), () -> ((PageSubscriptionCounter)counter).getValue());
                Wait.assertEquals((long)((i + 1) * 1000), () -> ((PageSubscriptionCounter)counter).getPersistentSize());
                tx = new TransactionImpl(this.server.getStorageManager());
            }
            tx.commit();
            Wait.assertEquals((long)2100L, () -> ((PageSubscriptionCounter)counter).getValue());
            Wait.assertEquals((long)0x200B20L, () -> ((PageSubscriptionCounter)counter).getPersistentSize());
            this.server.stop();
            this.server = this.newActiveMQServer();
            this.server.setRebuildCounters(false);
            this.server.start();
            queue = this.server.locateQueue(SimpleString.of((String)"A1"));
            Assertions.assertNotNull((Object)queue);
            counter = this.locateCounter(queue);
            Assertions.assertEquals((long)2100L, (long)counter.getValue());
            Assertions.assertEquals((long)0x200B20L, (long)counter.getPersistentSize());
            this.server.getPagingManager().rebuildCounters(null);
            Wait.assertEquals((long)0L, () -> ((PageSubscriptionCounter)counter).getValue());
        }
        finally {
            sf.close();
            session.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCleanupCounterNonPersistent() throws Exception {
        ClientSessionFactory sf = this.createSessionFactory(this.sl);
        ClientSession session = sf.createSession();
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of((String)"A1"), RoutingType.ANYCAST));
            Queue queue = this.server.createQueue(QueueConfiguration.of((SimpleString)SimpleString.of((String)"A1")).setRoutingType(RoutingType.ANYCAST));
            PageSubscriptionCounter counter = this.locateCounter(queue);
            StorageManager storage = this.server.getStorageManager();
            TransactionImpl tx = new TransactionImpl(this.server.getStorageManager());
            for (int i = 0; i < 2100; ++i) {
                counter.increment((Transaction)tx, 1, 1000L);
                if (i % 200 != 0) continue;
                tx.commit();
                storage.waitOnOperations();
                Assertions.assertEquals((long)(i + 1), (long)counter.getValue());
                Assertions.assertEquals((long)((i + 1) * 1000), (long)counter.getPersistentSize());
                tx = new TransactionImpl(this.server.getStorageManager());
            }
            tx.commit();
            storage.waitOnOperations();
            Assertions.assertEquals((long)2100L, (long)counter.getValue());
            Assertions.assertEquals((long)0x200B20L, (long)counter.getPersistentSize());
            this.server.stop();
            this.server = this.newActiveMQServer();
            this.server.start();
            queue = this.server.locateQueue(SimpleString.of((String)"A1"));
            Assertions.assertNotNull((Object)queue);
            counter = this.locateCounter(queue);
            Assertions.assertEquals((long)0L, (long)counter.getValue());
            Assertions.assertEquals((long)0L, (long)counter.getPersistentSize());
        }
        finally {
            sf.close();
            session.close();
        }
    }

    @Test
    public void testRestartCounter() throws Exception {
        this.server.addAddressInfo(new AddressInfo(SimpleString.of((String)"A1"), RoutingType.ANYCAST));
        Queue queue = this.server.createQueue(QueueConfiguration.of((SimpleString)SimpleString.of((String)"A1")).setRoutingType(RoutingType.ANYCAST));
        PageSubscriptionCounter counter = this.locateCounter(queue);
        StorageManager storage = this.server.getStorageManager();
        TransactionImpl tx = new TransactionImpl(this.server.getStorageManager());
        counter.increment((Transaction)tx, 1, 1000L);
        Assertions.assertEquals((long)0L, (long)counter.getValue());
        Assertions.assertEquals((long)0L, (long)counter.getPersistentSize());
        tx.commit();
        Wait.assertEquals((long)1L, () -> ((PageSubscriptionCounter)counter).getValue());
        Wait.assertEquals((long)1000L, () -> ((PageSubscriptionCounter)counter).getPersistentSize());
        this.sl.close();
        this.server.stop();
        this.server = this.newActiveMQServer();
        this.server.setRebuildCounters(false);
        this.server.start();
        queue = this.server.locateQueue(SimpleString.of((String)"A1"));
        Assertions.assertNotNull((Object)queue);
        PageSubscriptionCounter counterAfterRestart = this.locateCounter(queue);
        Wait.assertEquals((long)1L, () -> ((PageSubscriptionCounter)counterAfterRestart).getValue());
        Wait.assertEquals((long)1000L, () -> ((PageSubscriptionCounter)counterAfterRestart).getPersistentSize());
        counterAfterRestart.markRebuilding();
        Wait.assertEquals((long)1L, () -> ((PageSubscriptionCounter)counterAfterRestart).getValue());
        tx = new TransactionImpl(this.server.getStorageManager());
        counterAfterRestart.increment((Transaction)tx, 10, 10000L);
        tx.commit();
        Wait.assertEquals((long)11L, () -> ((PageSubscriptionCounter)counterAfterRestart).getValue());
        Wait.assertEquals((long)11000L, () -> ((PageSubscriptionCounter)counterAfterRestart).getPersistentSize());
        counterAfterRestart.finishRebuild();
        this.server.getPagingManager().rebuildCounters(null);
        Wait.assertEquals((long)0L, () -> ((PageSubscriptionCounter)counterAfterRestart).getValue());
        Wait.assertEquals((long)0L, () -> ((PageSubscriptionCounter)counterAfterRestart).getPersistentSize());
    }

    private PageSubscriptionCounter locateCounter(Queue queue) throws Exception {
        PageSubscription subscription = this.server.getPagingManager().getPageStore(SimpleString.of((String)"A1")).getCursorProvider().getSubscription(queue.getID().longValue());
        PageSubscriptionCounter counter = subscription.getCounter();
        return counter;
    }

    @Test
    public void testCommitCounter() throws Exception {
        XidImpl xid = this.newXID();
        Queue queue = this.server.createQueue(QueueConfiguration.of((SimpleString)SimpleString.of((String)"A1")).setRoutingType(RoutingType.ANYCAST));
        PageSubscriptionCounter counter = this.locateCounter(queue);
        StorageManager storage = this.server.getStorageManager();
        TransactionImpl tx = new TransactionImpl((Xid)xid, this.server.getStorageManager(), 300);
        for (int i = 0; i < 2000; ++i) {
            counter.increment((Transaction)tx, 1, 1000L);
        }
        Assertions.assertEquals((long)0L, (long)counter.getValue());
        tx.commit();
        storage.waitOnOperations();
        Assertions.assertEquals((long)2000L, (long)counter.getValue());
        this.server.stop();
        this.server = this.newActiveMQServer();
        this.server.setRebuildCounters(false);
        this.server.start();
        queue = this.server.locateQueue(SimpleString.of((String)"A1"));
        Assertions.assertNotNull((Object)queue);
        counter = this.locateCounter(queue);
        Wait.assertEquals((long)2000L, () -> ((PageSubscriptionCounter)counter).getValue());
    }

    @Test
    public void testSendNoRebuild() throws Exception {
        Queue queue = this.server.createQueue(QueueConfiguration.of((SimpleString)SimpleString.of((String)"A1")).setRoutingType(RoutingType.ANYCAST));
        queue.getPagingStore().startPaging();
        PageSubscriptionCounter counter = this.locateCounter(queue);
        ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
        try (Connection connection = cf.createConnection();){
            Session session = connection.createSession(true, 0);
            MessageProducer producer = session.createProducer((Destination)session.createQueue("A1"));
            for (int i = 0; i < 3000; ++i) {
                producer.send((Message)session.createTextMessage("i" + i));
            }
            session.commit();
        }
        this.server.stop();
        this.server = this.newActiveMQServer();
        this.server.setRebuildCounters(false);
        this.server.start();
        queue = this.server.locateQueue(SimpleString.of((String)"A1"));
        Assertions.assertNotNull((Object)queue);
        counter = this.locateCounter(queue);
        logger.debug("Counter:: {}", (Object)queue.getMessageCount());
        Wait.assertEquals((long)3000L, () -> ((PageSubscriptionCounter)counter).getValue());
        Wait.assertEquals((Long)3000L, () -> ((Queue)queue).getMessageCount(), (long)1000L, (long)100L);
    }

    private ActiveMQServer newActiveMQServer() throws Exception {
        OperationContextImpl.clearContext();
        ActiveMQServer server = super.createServer(true, true);
        AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10240).setMaxSizeBytes(20480L).setMaxReadPageMessages(10);
        server.getAddressSettingsRepository().addMatch("#", (Object)defaultSetting);
        return server;
    }
}

