/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region.cursors;

import java.io.IOException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.QueueStorePrefetch;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.memory.MemoryMessageStore;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryMessageStoreQueueCursorTest {
    private static final Logger LOG = LoggerFactory.getLogger(MemoryMessageStoreQueueCursorTest.class);
    ActiveMQQueue destination = new ActiveMQQueue("queue-" + MemoryMessageStoreQueueCursorTest.class.getSimpleName());
    BrokerService brokerService;
    static final String mesageIdRoot = "11111:22222:0:";
    final int messageBytesSize = 1024;
    final String text = new String(new byte[1024]);

    @Before
    public void setUp() throws Exception {
        this.brokerService = this.createBroker();
        this.brokerService.setUseJmx(false);
        this.brokerService.setPersistent(false);
        this.brokerService.start();
    }

    protected BrokerService createBroker() throws Exception {
        return new BrokerService();
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
    }

    @Test(timeout=10000L)
    public void testRecoverNextMessages2() throws Exception {
        MemoryMessageStore queueMessageStore = new MemoryMessageStore((ActiveMQDestination)this.destination);
        DestinationStatistics destinationStatistics = new DestinationStatistics();
        Queue queue = new Queue(this.brokerService, (ActiveMQDestination)this.destination, (MessageStore)queueMessageStore, destinationStatistics, null);
        queueMessageStore.start();
        queueMessageStore.registerIndexListener(null);
        QueueStorePrefetch myCursor = new QueueStorePrefetch(queue, this.brokerService.getBroker());
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(5120L);
        myCursor.setSystemUsage(systemUsage);
        myCursor.setEnableAudit(false);
        myCursor.start();
        Assert.assertTrue((String)"cache enabled", (myCursor.isUseCache() && myCursor.isCacheEnabled() ? 1 : 0) != 0);
        ActiveMQTextMessage msg0 = this.getMessage(0);
        msg0.setMemoryUsage(systemUsage.getMemoryUsage());
        queueMessageStore.addMessage(null, (Message)msg0);
        myCursor.addMessageLast((MessageReference)msg0);
        msg0.decrementReferenceCount();
        if (myCursor.hasNext()) {
            MessageReference ref = myCursor.next();
            LOG.info("Received message: {} with body: ({})", (Object)ref.getMessageId(), (Object)((ActiveMQTextMessage)ref.getMessage()).getText());
            myCursor.remove();
            try {
                queueMessageStore.removeMessage(ref.getMessageId());
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
        systemUsage.getMemoryUsage().increaseUsage(10240L);
        ActiveMQTextMessage msg1 = this.getMessage(1);
        msg1.setMemoryUsage(systemUsage.getMemoryUsage());
        queueMessageStore.addMessage(null, (Message)msg1);
        myCursor.addMessageLast((MessageReference)msg1);
        msg1.decrementReferenceCount();
        boolean b = true;
        while (b) {
            if (!myCursor.hasNext()) continue;
            MessageReference ref = myCursor.next();
            LOG.info("Received message: {} with body: ({})", (Object)ref.getMessageId(), (Object)((ActiveMQTextMessage)ref.getMessage()).getText());
            myCursor.remove();
            try {
                queueMessageStore.removeMessage(ref.getMessageId());
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            b = false;
        }
    }

    private ActiveMQTextMessage getMessage(int i) throws Exception {
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        MessageId id = new MessageId(mesageIdRoot + i);
        id.setBrokerSequenceId((long)i);
        id.setProducerSequenceId((long)i);
        message.setMessageId(id);
        message.setDestination((ActiveMQDestination)this.destination);
        message.setPersistent(true);
        message.setResponseRequired(true);
        message.setText("Msg:" + i + " " + this.text);
        Assert.assertEquals((long)message.getMessageId().getProducerSequenceId(), (long)i);
        return message;
    }
}

