/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.cluster.failover;

import java.util.HashMap;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.RetryRule;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class PagingFailoverTest
extends FailoverTestBase {
    @Rule
    public RetryRule retryRule = new RetryRule(2);
    private static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
    private ServerLocator locator;
    private ClientSession session;
    private ClientSessionFactoryInternal sf;

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.locator = this.getServerLocator();
    }

    @Test
    public void testPageFailBeforeConsume() throws Exception {
        this.internalTestPage(false, true);
    }

    @Test
    public void testPage() throws Exception {
        this.internalTestPage(false, false);
    }

    @Test
    public void testPageTransactioned() throws Exception {
        this.internalTestPage(true, false);
    }

    @Test
    public void testPageTransactionedFailBeforeConsume() throws Exception {
        this.internalTestPage(true, true);
    }

    public void internalTestPage(boolean transacted, boolean failBeforeConsume) throws Exception {
        ClientMessage msg;
        int i;
        this.locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setReconnectAttempts(15);
        this.sf = this.createSessionFactoryAndWaitForTopology(this.locator, 2);
        this.session = this.addClientSession(this.sf.createSession(!transacted, !transacted, 0));
        this.session.createQueue(new QueueConfiguration(ADDRESS));
        ClientProducer prod = this.session.createProducer(ADDRESS);
        int TOTAL_MESSAGES = 200;
        for (int i2 = 0; i2 < 200; ++i2) {
            if (transacted && i2 % 10 == 0) {
                this.session.commit();
            }
            ClientMessage msg2 = this.session.createMessage(true);
            msg2.putIntProperty(new SimpleString("key"), i2);
            prod.send((Message)msg2);
        }
        this.session.commit();
        if (failBeforeConsume) {
            this.crash(this.session);
            this.waitForBackup(null, 5);
        }
        this.session.close();
        this.session = this.sf.createSession(!transacted, !transacted, 0);
        this.session.start();
        ClientConsumer cons = this.session.createConsumer(ADDRESS);
        int MIDDLE = 100;
        for (i = 0; i < 100; ++i) {
            msg = cons.receive(20000L);
            Assert.assertNotNull((Object)msg);
            msg.acknowledge();
            if (transacted && i % 10 == 0) {
                this.session.commit();
            }
            Assert.assertEquals((Object)i, (Object)msg.getObjectProperty(new SimpleString("key")));
        }
        this.session.commit();
        cons.close();
        if (!failBeforeConsume) {
            this.crash(this.session);
        }
        this.session.close();
        this.session = this.sf.createSession(true, true, 0);
        cons = this.session.createConsumer(ADDRESS);
        this.session.start();
        for (i = 100; i < 200; ++i) {
            msg = cons.receive(5000L);
            Assert.assertNotNull((Object)msg);
            msg.acknowledge();
            int result = (Integer)msg.getObjectProperty(new SimpleString("key"));
            Assert.assertEquals((long)i, (long)result);
        }
    }

    @Test
    public void testExpireMessage() throws Exception {
        this.locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setReconnectAttempts(15);
        ClientSessionFactoryInternal sf = this.createSessionFactoryAndWaitForTopology(this.locator, 2);
        this.session = sf.createSession(false, false, 0);
        this.session.createQueue(new QueueConfiguration(ADDRESS));
        ClientProducer prod = this.session.createProducer(ADDRESS);
        int TOTAL_MESSAGES = 1000;
        for (int i = 0; i < 1000; ++i) {
            ClientMessage msg = this.session.createMessage(true);
            msg.putIntProperty(new SimpleString("key"), i);
            msg.setExpiration(System.currentTimeMillis() + 100L);
            prod.send((Message)msg);
        }
        this.session.commit();
        this.crash(this.session);
        this.session.close();
        Queue queue = this.backupServer.getServer().locateQueue(ADDRESS);
        long timeout = System.currentTimeMillis() + 6000L;
        while (timeout > System.currentTimeMillis() && queue.getPageSubscription().isPaging()) {
            Thread.sleep(100L);
            queue.expireReferences();
        }
        Assert.assertFalse((boolean)queue.getPageSubscription().isPaging());
    }

    @Override
    protected TransportConfiguration getAcceptorTransportConfiguration(boolean live) {
        return TransportConfigurationUtils.getInVMAcceptor(live);
    }

    @Override
    protected TransportConfiguration getConnectorTransportConfiguration(boolean live) {
        return TransportConfigurationUtils.getInVMConnector(live);
    }

    protected ActiveMQServer createServer(boolean realFiles, Configuration configuration) {
        return this.addServer(this.createInVMFailoverServer(true, configuration, 1024, 2048, new HashMap(), this.nodeManager, 2));
    }

    @Override
    protected TestableServer createTestableServer(Configuration config) {
        return new SameProcessActiveMQServer(this.createServer(true, config));
    }
}

