/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.IOException;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.Wait;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ5844Test {
    static final Logger LOG = LoggerFactory.getLogger(AMQ5844Test.class);
    protected BrokerService broker;
    protected long checkPeriod = 2000L;
    protected long maxSlowDuration = 4000L;
    private String uri;
    private static final String QUEUE_NAME = "TEST.QUEUE";
    static boolean abortingSlowConsumer = false;
    static boolean successfullyReconnected = false;
    static final AbstractAppender appender = new AbstractAppender("testAppender", (Filter)new AbstractFilter(){}, (Layout)new MessageLayout(), false, new Property[0]){

        public void append(LogEvent event) {
            if (event.getMessage().getFormattedMessage().contains("aborting slow consumer")) {
                abortingSlowConsumer = true;
            }
            if (event.getMessage().getFormattedMessage().contains("Successfully reconnected to")) {
                successfullyReconnected = true;
            }
        }
    };

    @BeforeClass
    public static void setUp() throws Exception {
        org.apache.logging.log4j.core.Logger rootLogger = (org.apache.logging.log4j.core.Logger)LogManager.getRootLogger();
        rootLogger.get().addAppender((Appender)appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        rootLogger.addAppender((Appender)appender);
    }

    @Before
    public void createMaster() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        TransportConnector transportConnector = this.broker.addConnector("tcp://0.0.0.0:0");
        DefaultIOExceptionHandler defaultIOExceptionHandler = new DefaultIOExceptionHandler();
        this.broker.setIoExceptionHandler((IOExceptionHandler)defaultIOExceptionHandler);
        this.broker.setBrokerName("Main");
        PolicyEntry policy = new PolicyEntry();
        AbortSlowAckConsumerStrategy abortSlowConsumerStrategy = new AbortSlowAckConsumerStrategy();
        abortSlowConsumerStrategy.setAbortConnection(false);
        abortSlowConsumerStrategy.setCheckPeriod(this.checkPeriod);
        abortSlowConsumerStrategy.setMaxSlowDuration(this.maxSlowDuration);
        abortSlowConsumerStrategy.setMaxTimeSinceLastAck(this.maxSlowDuration);
        policy.setSlowConsumerStrategy((SlowConsumerStrategy)abortSlowConsumerStrategy);
        policy.setQueuePrefetch(0);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        this.broker.setDestinationPolicy(pMap);
        this.broker.start();
        this.uri = transportConnector.getPublishableConnectString();
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        ((org.apache.logging.log4j.core.Logger)LogManager.getRootLogger()).removeAppender((Appender)appender);
    }

    @Test
    public void testRecreateAbortedConsumer() throws Exception {
        String failoverTransportURL = "failover:(" + this.uri + ")";
        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(failoverTransportURL);
        amq.setWatchTopicAdvisories(false);
        Connection jmsConnection = amq.createConnection();
        ActiveMQConnection connection = (ActiveMQConnection)jmsConnection;
        connection.start();
        Session session = connection.createSession(true, 0);
        Queue destination = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer((Destination)destination);
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("Plain Text Message");
        String text = stringBuilder.toString();
        TextMessage message = session.createTextMessage(text);
        producer.send((Message)message, 1, 1, 0L);
        producer.send((Message)message, 1, 1, 0L);
        session.commit();
        producer.close();
        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer((Destination)destination);
        Message mess = consumer.receive();
        Assert.assertNotNull((Object)mess);
        Assert.assertTrue((String)"The browser aborts the slow consumer", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return abortingSlowConsumer;
            }
        }, (long)10000L));
        FailoverTransport failoverTransport = (FailoverTransport)connection.getTransport().narrow(FailoverTransport.class);
        failoverTransport.handleTransportFailure(new IOException());
        Assert.assertTrue((String)"The broker aborts the slow consumer", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return successfullyReconnected;
            }
        }, (long)4000L));
        try {
            mess = consumer.receive(2000L);
            Assert.assertNull((Object)mess);
            session.commit();
            Assert.fail((String)"Expect the commit to fail and a rollback to happen");
        }
        catch (TransactionRolledBackException expected) {
            Assert.assertTrue((boolean)expected.getMessage().contains("rolling back transaction"));
        }
        connection.close();
    }

    static {
        appender.start();
    }
}

