/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.cluster.reattach;

import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;

public class OrderReattachTest
extends ActiveMQTestBase {
    final SimpleString ADDRESS = new SimpleString("address");
    private ActiveMQServer server;

    @Test
    public void testOrderOnSendInVM() throws Throwable {
        this.doTestOrderOnSend(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doTestOrderOnSend(boolean isNetty) throws Throwable {
        this.server = this.createServer(false, isNetty);
        this.server.start();
        ServerLocator locator = this.createFactory(isNetty).setReconnectAttempts(15).setConfirmationWindowSize(0x100000).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false);
        ClientSessionFactory sf = this.createSessionFactory(locator);
        final ClientSession session = sf.createSession(false, true, true);
        final LinkedBlockingDeque<Boolean> failureQueue = new LinkedBlockingDeque<Boolean>();
        final CountDownLatch ready = new CountDownLatch(1);
        Thread failer = new Thread(){

            @Override
            public void run() {
                ready.countDown();
                block4: while (true) {
                    try {
                        while (true) {
                            Boolean poll = false;
                            try {
                                poll = (Boolean)failureQueue.poll(60L, TimeUnit.SECONDS);
                            }
                            catch (InterruptedException e) {
                                e.printStackTrace();
                                break block4;
                            }
                            Thread.sleep(1L);
                            RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
                            if (!poll.booleanValue()) break block4;
                            conn.fail((ActiveMQException)((Object)new ActiveMQNotConnectedException("poop")));
                        }
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        continue;
                    }
                    break;
                }
            }
        };
        failer.start();
        ready.await();
        try {
            this.doSend2(1, sf, failureQueue);
        }
        finally {
            try {
                session.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            try {
                locator.close();
            }
            catch (Exception e) {}
            try {
                sf.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            failureQueue.put(false);
            failer.join();
        }
    }

    /*
     * WARNING - void declaration
     */
    public void doSend2(int order, ClientSessionFactory sf, final LinkedBlockingDeque<Boolean> failureQueue) throws Exception {
        void var12_24;
        ClientSession s = sf.createSession(false, false, false);
        int numMessages = 500;
        int numSessions = 10;
        HashSet<ClientConsumer> consumers = new HashSet<ClientConsumer>();
        HashSet<ClientSession> sessions = new HashSet<ClientSession>();
        for (int i = 0; i < 10; ++i) {
            SimpleString subName = new SimpleString("sub" + i);
            ClientSession sessConsume = sf.createSession(false, true, true);
            sessConsume.createQueue(new QueueConfiguration(subName).setAddress(this.ADDRESS).setDurable(Boolean.valueOf(false)));
            ClientConsumer clientConsumer = sessConsume.createConsumer(subName);
            consumers.add(clientConsumer);
            sessions.add(sessConsume);
        }
        ClientSession sessSend = sf.createSession(false, true, true);
        ClientProducer producer = sessSend.createProducer(this.ADDRESS);
        for (int i = 0; i < 500; ++i) {
            ClientMessage clientMessage = sessSend.createMessage((byte)3, false, 0L, System.currentTimeMillis(), (byte)1);
            if (i % 10 == 0) {
                // empty if block
            }
            clientMessage.putIntProperty(new SimpleString("count"), i);
            producer.send((Message)clientMessage);
        }
        for (ClientSession clientSession : sessions) {
            clientSession.start();
        }
        class MyHandler
        implements MessageHandler {
            final CountDownLatch latch = new CountDownLatch(1);
            int count;
            Exception failure;

            MyHandler() {
            }

            public void onMessage(ClientMessage message) {
                if (this.count >= 500) {
                    this.failure = new Exception("too many messages");
                    this.latch.countDown();
                }
                if (message.getIntProperty("count") != this.count) {
                    this.failure = new Exception("counter " + this.count + " was not as expected (" + message.getIntProperty("count") + ")");
                    OrderReattachTest.this.instanceLog.warn((Object)"Failure on receiving message ", (Throwable)this.failure);
                    this.failure.printStackTrace();
                    this.latch.countDown();
                }
                ++this.count;
                if (this.count % 100 == 0) {
                    failureQueue.push(true);
                }
                if (this.count == 500) {
                    this.latch.countDown();
                }
            }
        }
        HashSet<MyHandler> handlers = new HashSet<MyHandler>();
        for (ClientConsumer consumer : consumers) {
            MyHandler handler = new MyHandler();
            consumer.setMessageHandler((MessageHandler)handler);
            handlers.add(handler);
        }
        for (MyHandler handler : handlers) {
            boolean ok = handler.latch.await(60000L, TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)ok);
            if (handler.failure == null) continue;
            throw handler.failure;
        }
        sessSend.close();
        for (ClientSession session : sessions) {
            session.close();
        }
        boolean bl = false;
        while (var12_24 < 10) {
            failureQueue.push(true);
            SimpleString subName = new SimpleString("sub" + (int)var12_24);
            s.deleteQueue(subName);
            ++var12_24;
        }
        s.close();
    }
}

