/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ProducerTest
extends ActiveMQTestBase {
    private ActiveMQServer server;
    private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = this.createServer(false);
        this.server.start();
    }

    @Test
    public void testProducerWithSmallWindowSizeAndLargeMessage() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        this.server.getRemotingService().addIncomingInterceptor((BaseInterceptor)new Interceptor(){

            public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
                if (packet.getType() == 71) {
                    latch.countDown();
                }
                return true;
            }
        });
        ServerLocator locator = this.createInVMNonHALocator().setConfirmationWindowSize(100);
        ClientSessionFactory cf = locator.createSessionFactory();
        ClientSession session = cf.createSession(false, true, true);
        ClientProducer producer = session.createProducer(this.QUEUE);
        ClientMessage message = session.createMessage(true);
        byte[] body = new byte[1000];
        message.getBodyBuffer().writeBytes(body);
        producer.send((Message)message);
        Assert.assertTrue((boolean)latch.await(5L, TimeUnit.SECONDS));
        session.close();
        locator.close();
    }

    @Test
    public void testProducerMultiThread() throws Exception {
        ServerLocator locator = this.createInVMNonHALocator();
        AddressSettings setting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK).setMaxSizeBytes(10240L);
        this.server.stop();
        this.server.getConfiguration().getAddressesSettings().clear();
        this.server.getConfiguration().getAddressesSettings().put(this.QUEUE.toString(), setting);
        this.server.start();
        this.server.createQueue(new QueueConfiguration(this.QUEUE));
        for (int i = 0; i < 100; ++i) {
            ClientMessage msg;
            final CountDownLatch latch = new CountDownLatch(1);
            this.instanceLog.debug((Object)("Try " + i));
            ClientSessionFactory cf = locator.createSessionFactory();
            final ClientSession session = cf.createSession(false, true, true);
            Thread t = new Thread(){

                @Override
                public void run() {
                    try {
                        ClientProducer producer = session.createProducer();
                        for (int i = 0; i < 62; ++i) {
                            if (i == 30) {
                                latch.countDown();
                            }
                            ClientMessage msg = session.createMessage(false);
                            msg.getBodyBuffer().writeBytes(new byte[2048]);
                            producer.send(ProducerTest.this.QUEUE, (Message)msg);
                        }
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            t.start();
            ProducerTest.assertTrue((boolean)latch.await(10L, TimeUnit.SECONDS));
            session.close();
            t.join(5000L);
            if (!t.isAlive()) {
                t.interrupt();
            }
            ProducerTest.assertFalse((boolean)t.isAlive());
            ClientSession sessionConsumer = cf.createSession();
            sessionConsumer.start();
            ClientConsumer cons = sessionConsumer.createConsumer(this.QUEUE);
            while ((msg = cons.receiveImmediate()) != null) {
                msg.acknowledge();
                sessionConsumer.commit();
            }
            cf.close();
        }
    }
}

