/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.store;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerRestartTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;

public class RecoverExpiredMessagesTest
extends BrokerRestartTestSupport {
    final ArrayList<String> expected = new ArrayList();
    final ActiveMQDestination destination = new ActiveMQQueue("TEST");
    public PendingQueueMessageStoragePolicy queuePendingPolicy;
    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;

    @Override
    protected void setUp() throws Exception {
        this.setAutoFail(true);
        super.setUp();
    }

    public void initCombosForTestRecovery() throws Exception {
        this.addCombinationValues("queuePendingPolicy", new PendingQueueMessageStoragePolicy[]{new FilePendingQueueMessageStoragePolicy(), new VMPendingQueueMessageStoragePolicy()});
        TestSupport.PersistenceAdapterChoice[] persistenceAdapters = new TestSupport.PersistenceAdapterChoice[]{TestSupport.PersistenceAdapterChoice.JDBC, TestSupport.PersistenceAdapterChoice.KahaDB};
        this.addCombinationValues("persistenceAdapterChoice", (Object[])persistenceAdapters);
    }

    public void testRecovery() throws Exception {
        this.sendSomeMessagesThatWillExpireIn5AndThenOne();
        this.broker.stop();
        this.broker.waitUntilStopped();
        TimeUnit.SECONDS.sleep(6L);
        this.broker = this.createRestartedBroker();
        this.broker.start();
        this.consumeExpected();
    }

    private void consumeExpected() throws Exception {
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, this.destination);
        connection.send((Command)consumerInfo);
        Message m = this.receiveMessage(connection);
        RecoverExpiredMessagesTest.assertNotNull((String)("Should have received message " + this.expected.get(0) + " by now!"), (Object)m);
        RecoverExpiredMessagesTest.assertEquals((String)this.expected.get(0), (String)m.getMessageId().toString());
        MessageAck ack = this.createAck(consumerInfo, m, 1, (byte)2);
        connection.send((Command)ack);
        this.assertNoMessagesLeft(connection);
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
    }

    private void sendSomeMessagesThatWillExpireIn5AndThenOne() throws Exception {
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        int MESSAGE_COUNT = 10;
        for (int i = 0; i < MESSAGE_COUNT; ++i) {
            Message message = this.createMessage(producerInfo, this.destination);
            message.setExpiration(System.currentTimeMillis() + 5000L);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        Message message = this.createMessage(producerInfo, this.destination);
        message.setPersistent(true);
        connection.send((Command)message);
        this.expected.add(message.getMessageId().toString());
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
    }

    @Override
    protected PolicyEntry getDefaultPolicy() {
        PolicyEntry policy = super.getDefaultPolicy();
        policy.setPendingQueuePolicy(this.queuePendingPolicy);
        policy.setExpireMessagesPeriod(0L);
        return policy;
    }

    @Override
    protected void configureBroker(BrokerService broker) throws Exception {
        super.configureBroker(broker);
        TestSupport.setPersistenceAdapter(broker, this.persistenceAdapterChoice);
    }

    public static Test suite() {
        return RecoverExpiredMessagesTest.suite(RecoverExpiredMessagesTest.class);
    }
}

