/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OptimizedAckTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(OptimizedAckTest.class);
    private ActiveMQConnection connection;

    protected void setUp() throws Exception {
        super.setUp();
        this.connection = (ActiveMQConnection)this.createConnection();
        this.connection.setOptimizeAcknowledge(true);
        this.connection.setOptimizeAcknowledgeTimeOut(0L);
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setAll(10);
        this.connection.setPrefetchPolicy(prefetchPolicy);
    }

    protected void tearDown() throws Exception {
        this.connection.close();
        super.tearDown();
    }

    public void testReceivedMessageStillInflight() throws Exception {
        Message msg;
        int i;
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        Queue queue = session.createQueue("test");
        MessageProducer producer = session.createProducer((Destination)queue);
        for (int i2 = 0; i2 < 10; ++i2) {
            producer.send((Message)session.createTextMessage("Hello" + i2));
        }
        final RegionBroker regionBroker = (RegionBroker)BrokerRegistry.getInstance().findFirst().getRegionBroker();
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        OptimizedAckTest.assertTrue((String)"prefetch full", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
                return 10L == regionBroker.getDestinationStatistics().getInflight().getCount();
            }
        }));
        for (i = 0; i < 6; ++i) {
            msg = consumer.receive(4000L);
            OptimizedAckTest.assertNotNull((Object)msg);
            OptimizedAckTest.assertEquals((String)("all prefetch is still in flight: " + i), (long)10L, (long)regionBroker.getDestinationStatistics().getInflight().getCount());
        }
        for (i = 6; i < 10; ++i) {
            msg = consumer.receive(4000L);
            OptimizedAckTest.assertNotNull((Object)msg);
            OptimizedAckTest.assertTrue((String)"most are acked but 3 remain", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return 3L == regionBroker.getDestinationStatistics().getInflight().getCount();
                }
            }));
        }
    }

    public void testVerySlowReceivedMessageStillInflight() throws Exception {
        Message msg;
        int i;
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        Queue queue = session.createQueue("test");
        MessageProducer producer = session.createProducer((Destination)queue);
        for (int i2 = 0; i2 < 10; ++i2) {
            producer.send((Message)session.createTextMessage("Hello" + i2));
        }
        final RegionBroker regionBroker = (RegionBroker)BrokerRegistry.getInstance().findFirst().getRegionBroker();
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        OptimizedAckTest.assertTrue((String)"prefetch full", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
                return 10L == regionBroker.getDestinationStatistics().getInflight().getCount();
            }
        }));
        for (i = 0; i < 6; ++i) {
            Thread.sleep(400L);
            msg = consumer.receive(4000L);
            OptimizedAckTest.assertNotNull((Object)msg);
            OptimizedAckTest.assertEquals((String)("all prefetch is still in flight: " + i), (long)10L, (long)regionBroker.getDestinationStatistics().getInflight().getCount());
        }
        for (i = 6; i < 10; ++i) {
            Thread.sleep(400L);
            msg = consumer.receive(4000L);
            OptimizedAckTest.assertNotNull((Object)msg);
            OptimizedAckTest.assertTrue((String)"most are acked but 3 remain", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return 3L == regionBroker.getDestinationStatistics().getInflight().getCount();
                }
            }));
        }
    }

    public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception {
        Message msg;
        int i;
        this.connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10L));
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        Queue queue = session.createQueue("test");
        MessageProducer producer = session.createProducer((Destination)queue);
        for (int i2 = 0; i2 < 10; ++i2) {
            producer.send((Message)session.createTextMessage("Hello" + i2));
        }
        final RegionBroker regionBroker = (RegionBroker)BrokerRegistry.getInstance().findFirst().getRegionBroker();
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        OptimizedAckTest.assertTrue((String)"prefetch full", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
                return 10L == regionBroker.getDestinationStatistics().getInflight().getCount();
            }
        }));
        for (i = 0; i < 6; ++i) {
            msg = consumer.receive(4000L);
            OptimizedAckTest.assertNotNull((Object)msg);
            OptimizedAckTest.assertEquals((String)("all prefetch is still in flight: " + i), (long)10L, (long)regionBroker.getDestinationStatistics().getInflight().getCount());
        }
        for (i = 6; i < 10; ++i) {
            msg = consumer.receive(4000L);
            OptimizedAckTest.assertNotNull((Object)msg);
            OptimizedAckTest.assertTrue((String)"most are acked but 3 remain", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return 3L == regionBroker.getDestinationStatistics().getInflight().getCount();
                }
            }));
        }
        OptimizedAckTest.assertTrue((String)"After delay the scheduled ack should ack all inflight.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
                return 0L == regionBroker.getDestinationStatistics().getInflight().getCount();
            }
        }));
    }
}

