/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.failover;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ1925Test
extends TestCase
implements ExceptionListener {
    private static final Logger log = LoggerFactory.getLogger(AMQ1925Test.class);
    private static final String QUEUE_NAME = "test.amq1925";
    private static final String PROPERTY_MSG_NUMBER = "NUMBER";
    private static final int MESSAGE_COUNT = 10000;
    private BrokerService bs;
    private URI tcpUri;
    private ActiveMQConnectionFactory cf;
    private JMSException exception;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void XtestAMQ1925_TXInProgress() throws Exception {
        Connection connection = this.cf.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageConsumer consumer = session.createConsumer((Destination)session.createQueue(QUEUE_NAME));
        final Object starter = new Object();
        final AtomicBoolean restarted = new AtomicBoolean();
        new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    Object object = starter;
                    synchronized (object) {
                        starter.wait();
                    }
                    AMQ1925Test.this.bs.stop();
                    AMQ1925Test.this.bs = new BrokerService();
                    AMQ1925Test.this.bs.setPersistent(true);
                    AMQ1925Test.this.bs.setUseJmx(true);
                    AMQ1925Test.this.bs.addConnector(AMQ1925Test.this.tcpUri);
                    AMQ1925Test.this.bs.start();
                    restarted.set(true);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Object object = starter;
        synchronized (object) {
            starter.notifyAll();
        }
        for (int i = 0; i < 10000; ++i) {
            Message message = consumer.receive(500L);
            AMQ1925Test.assertNotNull((String)("No Message " + i + " found"), (Object)message);
            if (i < 10) {
                AMQ1925Test.assertFalse((String)"Timing problem, restarted too soon", (boolean)restarted.get());
            }
            if (i == 10) {
                Object object2 = starter;
                synchronized (object2) {
                    starter.notifyAll();
                }
            }
            if (i > 9900) {
                AMQ1925Test.assertTrue((String)"Timing problem, restarted too late", (boolean)restarted.get());
            }
            AMQ1925Test.assertEquals((int)i, (int)message.getIntProperty(PROPERTY_MSG_NUMBER));
            session.commit();
        }
        AMQ1925Test.assertNull((Object)consumer.receive(500L));
        consumer.close();
        session.close();
        connection.close();
        this.assertQueueEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void XtestAMQ1925_TXInProgress_TwoConsumers() throws Exception {
        Connection connection = this.cf.createConnection();
        connection.start();
        Session session1 = connection.createSession(true, 0);
        MessageConsumer consumer1 = session1.createConsumer((Destination)session1.createQueue(QUEUE_NAME));
        Session session2 = connection.createSession(true, 0);
        MessageConsumer consumer2 = session2.createConsumer((Destination)session2.createQueue(QUEUE_NAME));
        final Object starter = new Object();
        final AtomicBoolean restarted = new AtomicBoolean();
        new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    Object object = starter;
                    synchronized (object) {
                        starter.wait();
                    }
                    AMQ1925Test.this.bs.stop();
                    AMQ1925Test.this.bs = new BrokerService();
                    AMQ1925Test.this.bs.setPersistent(true);
                    AMQ1925Test.this.bs.setUseJmx(true);
                    AMQ1925Test.this.bs.addConnector(AMQ1925Test.this.tcpUri);
                    AMQ1925Test.this.bs.start();
                    restarted.set(true);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Object object = starter;
        synchronized (object) {
            starter.notifyAll();
        }
        ArrayList<Integer> results = new ArrayList<Integer>(10000);
        for (int i = 0; i < 10000; ++i) {
            Message message1 = consumer1.receive(20L);
            Message message2 = consumer2.receive(20L);
            if (message1 == null && message2 == null) {
                if (results.size() >= 10000) break;
                message1 = consumer1.receive(500L);
                message2 = consumer2.receive(500L);
                if (message1 != null || message2 != null) break;
                break;
            }
            if (i < 10) {
                AMQ1925Test.assertFalse((String)"Timing problem, restarted too soon", (boolean)restarted.get());
            }
            if (i == 10) {
                Object object2 = starter;
                synchronized (object2) {
                    starter.notifyAll();
                }
            }
            if (i > 9950) {
                AMQ1925Test.assertTrue((String)"Timing problem, restarted too late", (boolean)restarted.get());
            }
            if (message1 != null) {
                results.add(message1.getIntProperty(PROPERTY_MSG_NUMBER));
                session1.commit();
            }
            if (message2 == null) continue;
            results.add(message2.getIntProperty(PROPERTY_MSG_NUMBER));
            session2.commit();
        }
        AMQ1925Test.assertNull((Object)consumer1.receive(500L));
        AMQ1925Test.assertNull((Object)consumer2.receive(500L));
        consumer1.close();
        session1.close();
        consumer2.close();
        session2.close();
        connection.close();
        int foundMissingMessages = 0;
        if (results.size() < 10000) {
            foundMissingMessages = this.tryToFetchMissingMessages();
        }
        for (int i = 0; i < 10000; ++i) {
            AMQ1925Test.assertTrue((String)("Message-Nr " + i + " not found (" + results.size() + " total, " + foundMissingMessages + " have been found 'lingering' in the queue)"), (boolean)results.contains(i));
        }
        this.assertQueueEmpty();
    }

    private int tryToFetchMissingMessages() throws JMSException {
        Message message;
        Connection connection = this.cf.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageConsumer consumer = session.createConsumer((Destination)session.createQueue(QUEUE_NAME));
        int count = 0;
        while ((message = consumer.receive(500L)) != null) {
            log.info("Found \"missing\" message: " + message);
            ++count;
        }
        consumer.close();
        session.close();
        connection.close();
        return count;
    }

    public void testAMQ1925_TXBegin() throws Exception {
        Connection connection = this.cf.createConnection();
        connection.start();
        connection.setExceptionListener((ExceptionListener)this);
        Session session = connection.createSession(true, 0);
        MessageConsumer consumer = session.createConsumer((Destination)session.createQueue(QUEUE_NAME));
        boolean restartDone = false;
        for (int i = 0; i < 10000; ++i) {
            Message message = consumer.receive(5000L);
            AMQ1925Test.assertNotNull((Object)message);
            if (i == 222 && !restartDone) {
                this.bs.stop();
                this.bs = new BrokerService();
                this.bs.setPersistent(true);
                this.bs.setUseJmx(true);
                this.bs.addConnector(this.tcpUri);
                this.bs.start();
                restartDone = true;
            }
            AMQ1925Test.assertEquals((int)i, (int)message.getIntProperty(PROPERTY_MSG_NUMBER));
            try {
                session.commit();
                continue;
            }
            catch (TransactionRolledBackException expectedOnOccasion) {
                log.info("got rollback: " + expectedOnOccasion);
                --i;
            }
        }
        AMQ1925Test.assertNull((Object)consumer.receive(500L));
        consumer.close();
        session.close();
        connection.close();
        this.assertQueueEmpty();
        AMQ1925Test.assertNull((String)("no exception on connection listener: " + this.exception), (Object)((Object)this.exception));
    }

    public void testAMQ1925_TXCommited() throws Exception {
        Connection connection = this.cf.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageConsumer consumer = session.createConsumer((Destination)session.createQueue(QUEUE_NAME));
        for (int i = 0; i < 10000; ++i) {
            Message message = consumer.receive(5000L);
            AMQ1925Test.assertNotNull((Object)message);
            AMQ1925Test.assertEquals((int)i, (int)message.getIntProperty(PROPERTY_MSG_NUMBER));
            session.commit();
            if (i != 222) continue;
            this.bs.stop();
            this.bs = new BrokerService();
            this.bs.setPersistent(true);
            this.bs.setUseJmx(true);
            this.bs.addConnector(this.tcpUri);
            this.bs.start();
        }
        AMQ1925Test.assertNull((Object)consumer.receive(500L));
        consumer.close();
        session.close();
        connection.close();
        this.assertQueueEmpty();
    }

    private void assertQueueEmpty() throws Exception {
        Connection connection = this.cf.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageConsumer consumer = session.createConsumer((Destination)session.createQueue(QUEUE_NAME));
        Message msg = consumer.receive(500L);
        if (msg != null) {
            AMQ1925Test.fail((String)msg.toString());
        }
        consumer.close();
        session.close();
        connection.close();
        this.assertQueueLength(0);
    }

    private void assertQueueLength(int len) throws Exception, IOException {
        Set destinations = this.bs.getBroker().getDestinations((ActiveMQDestination)new ActiveMQQueue(QUEUE_NAME));
        Queue queue = (Queue)destinations.iterator().next();
        AMQ1925Test.assertEquals((int)len, (int)queue.getMessageStore().getMessageCount());
    }

    private void sendMessagesToQueue() throws Exception {
        Connection connection = this.cf.createConnection();
        Session session = connection.createSession(true, 0);
        MessageProducer producer = session.createProducer((Destination)session.createQueue(QUEUE_NAME));
        producer.setDeliveryMode(2);
        for (int i = 0; i < 10000; ++i) {
            TextMessage message = session.createTextMessage("Test message " + i);
            message.setIntProperty(PROPERTY_MSG_NUMBER, i);
            producer.send((Message)message);
        }
        session.commit();
        producer.close();
        session.close();
        connection.close();
        this.assertQueueLength(10000);
    }

    protected void setUp() throws Exception {
        this.exception = null;
        this.bs = new BrokerService();
        this.bs.setDeleteAllMessagesOnStartup(true);
        this.bs.setPersistent(true);
        this.bs.setUseJmx(true);
        TransportConnector connector = this.bs.addConnector("tcp://localhost:0");
        this.bs.start();
        this.tcpUri = connector.getConnectUri();
        this.cf = new ActiveMQConnectionFactory("failover://(" + this.tcpUri + ")");
        this.sendMessagesToQueue();
    }

    protected void tearDown() throws Exception {
        new ServiceStopper().stop((Service)this.bs);
    }

    public void onException(JMSException exception) {
        this.exception = exception;
    }
}

