/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.paging;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.paging.PagingWithFailoverServer;
import org.apache.activemq.artemis.tests.integration.paging.SpawnedServerSupport;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class PagingWithFailoverAndCountersTest
extends ActiveMQTestBase {
    Process liveProcess;
    Process backupProcess;
    PagingWithFailoverServer inProcessBackup;
    private static final int PORT1 = 5050;
    private static final int PORT2 = 5051;

    private void startLive() throws Exception {
        PagingWithFailoverAndCountersTest.assertNull((Object)this.liveProcess);
        this.liveProcess = PagingWithFailoverServer.spawnVM(this.getTestDir(), 5050, 5051);
    }

    private void startBackupInProcess() throws Exception {
        PagingWithFailoverAndCountersTest.assertNull((Object)this.backupProcess);
        PagingWithFailoverAndCountersTest.assertNull((Object)this.inProcessBackup);
        this.inProcessBackup = new PagingWithFailoverServer();
        this.inProcessBackup.perform(this.getTestDir(), 5051, 5050, true);
    }

    @After
    public void tearDown() throws Exception {
        this.killLive();
        this.killBackup();
        super.tearDown();
    }

    private void killBackup() {
        try {
            if (this.backupProcess != null) {
                this.backupProcess.destroy();
            }
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.backupProcess = null;
        if (this.inProcessBackup != null) {
            try {
                this.inProcessBackup.getServer().fail(false);
            }
            catch (Throwable ignored) {
                ignored.printStackTrace();
            }
            this.inProcessBackup = null;
        }
    }

    private void killLive() {
        try {
            if (this.liveProcess != null) {
                this.liveProcess.destroy();
            }
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.liveProcess = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValidateDeliveryAndCounters() throws Exception {
        this.startLive();
        ServerLocator locator = SpawnedServerSupport.createLocator(5050).setInitialConnectAttempts(300).setReconnectAttempts(300).setRetryInterval(100L);
        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        session.createQueue(new QueueConfiguration("DeadConsumer").setAddress("myAddress"));
        session.createQueue(new QueueConfiguration("cons2").setAddress("myAddress"));
        this.startBackupInProcess();
        PagingWithFailoverAndCountersTest.waitForRemoteBackup((ClientSessionFactory)factory, (int)10);
        ConsumerThread tConsumer = new ConsumerThread(factory, "cons2", 0L, 10);
        tConsumer.start();
        MonitorThread monitor = new MonitorThread();
        ClientProducer prod = session.createProducer("myAddress");
        long i = 0L;
        long timeRun = System.currentTimeMillis() + 5000L;
        long timeKill = System.currentTimeMillis() + 2000L;
        while (System.currentTimeMillis() < timeRun) {
            ++i;
            if (System.currentTimeMillis() > timeKill && this.liveProcess != null) {
                this.killLive();
                monitor.start();
            }
            try {
                ClientMessage msg = session.createMessage(true);
                msg.putLongProperty("count", i);
                prod.send((Message)msg);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        try {
            tConsumer.stopTest();
            monitor.stopTest();
        }
        finally {
            this.killBackup();
            this.killLive();
        }
        factory.close();
        this.verifyServer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void verifyServer() throws Exception {
        ActiveMQServer server = PagingWithFailoverServer.createServer(this.getTestDir(), 5050, 5051, false);
        server.start();
        this.waitForServerToStart(server);
        Queue queue = server.locateQueue(SimpleString.toSimpleString((String)"cons2"));
        int messageCount = this.getMessageCount(queue);
        PagingWithFailoverAndCountersTest.assertTrue((messageCount >= 0 ? (byte)1 : 0) != 0);
        ServerLocator locator = SpawnedServerSupport.createLocator(5050).setInitialConnectAttempts(100).setReconnectAttempts(300).setRetryInterval(100L);
        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        session.start();
        try {
            this.drainConsumer(session.createConsumer("cons2"), "cons2", messageCount);
        }
        finally {
            session.close();
            factory.close();
            locator.close();
            server.stop();
        }
    }

    private void drainConsumer(ClientConsumer consumer, String name, int expectedCount) throws Exception {
        for (int i = 0; i < expectedCount; ++i) {
            ClientMessage msg = consumer.receive(5000L);
            PagingWithFailoverAndCountersTest.assertNotNull((Object)msg);
            msg.acknowledge();
        }
        PagingWithFailoverAndCountersTest.assertNull((Object)consumer.receiveImmediate());
        consumer.close();
    }

    class MonitorThread
    extends TestThread {
        MonitorThread() {
            super("Monitor-thread");
        }

        @Override
        public void run() {
            ActiveMQServer server = PagingWithFailoverAndCountersTest.this.inProcessBackup.getServer();
            try {
                PagingWithFailoverAndCountersTest.this.waitForServerToStart(server);
                try {
                    ServerLocator locator = SpawnedServerSupport.createLocator(5051).setInitialConnectAttempts(100).setRetryInterval(100L);
                    ClientSessionFactory factory = locator.createSessionFactory();
                    ClientSession session = factory.createSession();
                    session.createQueue(new QueueConfiguration("new-queue").setRoutingType(RoutingType.ANYCAST));
                    session.start();
                    ClientProducer prod = session.createProducer("new-queue");
                    prod.send((Message)session.createMessage(true));
                    ClientConsumer cons = session.createConsumer("new-queue");
                    cons.receive(500L).acknowledge();
                    cons.close();
                    session.deleteQueue("new-queue");
                    locator.close();
                }
                catch (Throwable e) {
                    e.printStackTrace();
                    Assert.fail((String)e.getMessage());
                }
                Queue queue2 = PagingWithFailoverAndCountersTest.this.inProcessBackup.getServer().locateQueue(SimpleString.toSimpleString((String)"cons2"));
                while (this.isRunning(1L)) {
                    long count = PagingWithFailoverAndCountersTest.this.getMessageCount(queue2);
                    if (count >= 0L) continue;
                    Assert.fail((String)("count < 0 .... being " + count));
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    class ConsumerThread
    extends TestThread {
        ClientSessionFactory factory;
        String queueName;
        final AtomicInteger errors;
        final int txSize;

        ConsumerThread(ClientSessionFactory factory, String queueName, long delayEachMessage, int txSize) {
            this.errors = new AtomicInteger(0);
            this.factory = factory;
            this.queueName = queueName;
            this.txSize = txSize;
        }

        @Override
        public void run() {
            try {
                ClientSession session = this.txSize == 0 ? this.factory.createSession(true, true) : this.factory.createSession(false, false);
                ClientConsumer cons = session.createConsumer(this.queueName);
                session.start();
                long lastCommit = 0L;
                int msgcount = 0;
                long currentMsg = 0L;
                while (this.isRunning(0L)) {
                    try {
                        ClientMessage msg = cons.receive(100L);
                        if (msg != null) {
                            currentMsg = msg.getLongProperty("count");
                            if (currentMsg < lastCommit) {
                                this.failed("Message received in duplicate out of order, LastCommit = " + lastCommit + ", currentMsg = " + currentMsg);
                            }
                            msg.acknowledge();
                            if (this.txSize > 0 && msgcount > 0 && msgcount % this.txSize == 0) {
                                session.commit();
                                if (currentMsg > lastCommit) {
                                    lastCommit = currentMsg;
                                }
                            }
                            ++msgcount;
                        }
                        if (msgcount % 100 != 0) continue;
                        PagingWithFailoverAndCountersTest.this.instanceLog.debug((Object)("received " + msgcount + " on " + this.queueName));
                    }
                    catch (Throwable e) {
                        PagingWithFailoverAndCountersTest.this.instanceLog.warn((Object)("=====> expected Error at " + currentMsg + " with lastCommit=" + lastCommit));
                    }
                }
                session.close();
            }
            catch (Exception e) {
                e.printStackTrace();
                this.errors.incrementAndGet();
            }
        }
    }

    class TestThread
    extends Thread {
        boolean running;
        final Object waitNotify;
        private boolean failed;

        TestThread() {
            this.running = true;
            this.waitNotify = new Object();
            this.failed = false;
        }

        TestThread(String name) {
            super(name);
            this.running = true;
            this.waitNotify = new Object();
            this.failed = false;
        }

        public void failed(String message) {
            System.err.println(message);
            this.failed = true;
        }

        public boolean isFailed() {
            return this.failed;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void stopTest() {
            Object object = this.waitNotify;
            synchronized (object) {
                this.running = false;
                this.waitNotify.notifyAll();
            }
            while (this.isAlive()) {
                try {
                    this.join(5000L);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                if (!this.isAlive()) continue;
                this.interrupt();
            }
            Assert.assertFalse((boolean)this.failed);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean isRunning(long timeWait) {
            Object object = this.waitNotify;
            synchronized (object) {
                try {
                    if (timeWait > 0L) {
                        long timeout = System.currentTimeMillis() + timeWait;
                        while (this.running && timeout > System.currentTimeMillis()) {
                            this.waitNotify.wait(timeWait);
                        }
                    }
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
                return this.running;
            }
        }
    }
}

