/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.ResourceAllocationException;
import jakarta.jms.Session;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.JMSClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;

public class AmqpFlowControlTest
extends JMSClientTestSupport {
    private static final long MAX_SIZE_BYTES = 0x100000L;
    private static final long MAX_SIZE_BYTES_REJECT_THRESHOLD = 0x200000L;
    private String singleCreditAcceptorURI = new String("tcp://localhost:5680");
    private int messagesSent;

    @Override
    public void setUp() throws Exception {
        super.setUp();
        this.messagesSent = 0;
    }

    @Override
    protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
        server.getConfiguration().addAcceptorConfiguration("flow", this.singleCreditAcceptorURI + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpLowCredits=1");
    }

    @Override
    protected void configureAddressPolicy(ActiveMQServer server) {
        AddressSettings addressSettings = (AddressSettings)server.getAddressSettingsRepository().getMatch("#");
        addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
        addressSettings.setMaxSizeBytes(0x100000L);
        addressSettings.setMaxSizeBytesRejectThreshold(0x200000L);
        server.getAddressSettingsRepository().addMatch("#", (Object)addressSettings);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
        AmqpClient client = this.createAmqpClient(new URI(this.singleCreditAcceptorURI));
        try (AmqpConnection connection = this.addConnection(client.connect());){
            AmqpSession session = connection.createSession();
            AmqpSender sender = session.createSender(this.getQueueName());
            AmqpFlowControlTest.assertEquals((String)"Should only be issued one credit", (long)1L, (long)sender.getSender().getCredit());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
        AmqpClient client = this.createAmqpClient(new URI(this.singleCreditAcceptorURI));
        try (AmqpConnection connection = this.addConnection(client.connect());){
            AmqpSession session = connection.createSession();
            AmqpSender sender = session.createSender(this.getQueueName());
            sender.setSendTimeout(-1L);
            this.sendUntilFull(sender);
            AmqpFlowControlTest.assertTrue((sender.getSender().getCredit() == -1 ? (byte)1 : 0) != 0);
            long addressSize = this.server.getPagingManager().getPageStore(new SimpleString(this.getQueueName())).getAddressSize();
            AmqpFlowControlTest.assertTrue((addressSize >= 0x100000L && addressSize <= 0x200000L ? (byte)1 : 0) != 0);
        }
    }

    @Test(timeout=60000L)
    public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        Queue d = session.createQueue(this.getQueueName());
        MessageProducer p = session.createProducer((Destination)d);
        this.fillAddress(this.getQueueName());
        ResourceAllocationException e = null;
        try {
            p.send((Message)session.createBytesMessage());
        }
        catch (ResourceAllocationException rae) {
            e = rae;
        }
        AmqpFlowControlTest.assertTrue((boolean)(e instanceof ResourceAllocationException));
        AmqpFlowControlTest.assertTrue((boolean)e.getMessage().contains("resource-limit-exceeded"));
        long addressSize = this.server.getPagingManager().getPageStore(new SimpleString(this.getQueueName())).getAddressSize();
        AmqpFlowControlTest.assertTrue((addressSize >= 0x200000L ? (byte)1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
        this.fillAddress(this.getQueueName());
        AmqpClient client = this.createAmqpClient();
        try (AmqpConnection connection = this.addConnection(client.connect());){
            AmqpSession session = connection.createSession();
            AmqpSender sender = session.createSender(this.getQueueName());
            Thread.sleep(500L);
            AmqpFlowControlTest.assertEquals((long)0L, (long)sender.getSender().getCredit());
            AmqpReceiver receiver = session.createReceiver(this.getQueueName());
            receiver.flow(100);
            for (int i = 0; i < this.messagesSent - 1; ++i) {
                AmqpMessage m = receiver.receive(5000L, TimeUnit.MILLISECONDS);
                m.accept();
            }
            Thread.sleep(500L);
            AmqpFlowControlTest.assertTrue((sender.getSender().getCredit() >= 0 ? (byte)1 : 0) != 0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception {
        this.fillAddress(this.getQueueName());
        AmqpClient client = this.createAmqpClient();
        try (AmqpConnection connection = this.addConnection(client.connect());){
            AmqpSession session = connection.createSession();
            AmqpSender sender = session.createSender(this.getQueueName());
            Thread.sleep(1000L);
            AmqpFlowControlTest.assertEquals((long)0L, (long)sender.getSender().getCredit());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable {
        AmqpClient client = this.createAmqpClient();
        AmqpConnection connection = this.addConnection(client.connect());
        AmqpSession session = connection.createSession();
        AmqpSender sender = session.createSender(this.getQueueName());
        sender.setPresettle(true);
        this.fillAddress(this.getQueueName());
        AmqpMessage message = new AmqpMessage();
        byte[] payload = new byte[51200];
        message.setBytes(payload);
        Exception expectedException = null;
        try {
            session.begin();
            sender.send(message);
            session.commit();
        }
        catch (Exception e) {
            expectedException = e;
        }
        finally {
            connection.close();
        }
        AmqpFlowControlTest.assertNotNull((Object)expectedException);
        AmqpFlowControlTest.assertTrue((boolean)expectedException.getMessage().contains("resource-limit-exceeded"));
        AmqpFlowControlTest.assertTrue((boolean)expectedException.getMessage().contains("Address is full: " + this.getQueueName()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fillAddress(String address) throws Exception {
        AmqpClient client = this.createAmqpClient();
        Exception exception = null;
        try (AmqpConnection connection = this.addConnection(client.connect());){
            AmqpSession session = connection.createSession();
            AmqpSender sender = session.createSender(address);
            this.sendUntilFull(sender);
        }
        AmqpFlowControlTest.assertNotNull((Object)exception);
        AmqpFlowControlTest.assertTrue((boolean)exception.getMessage().contains("amqp:resource-limit-exceeded"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendUntilFull(final AmqpSender sender) throws Exception {
        final AmqpMessage message = new AmqpMessage();
        byte[] payload = new byte[51200];
        message.setBytes(payload);
        int maxMessages = 50;
        final AtomicInteger sentMessages = new AtomicInteger(0);
        final Exception[] errors = new Exception[1];
        final CountDownLatch timeout = new CountDownLatch(1);
        Runnable sendMessages = new Runnable(){

            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50; ++i) {
                        sender.send(message);
                        sentMessages.getAndIncrement();
                    }
                    timeout.countDown();
                }
                catch (IOException e) {
                    errors[0] = e;
                }
            }
        };
        Thread t = new Thread(sendMessages);
        try {
            t.start();
            timeout.await(1L, TimeUnit.SECONDS);
            this.messagesSent = sentMessages.get();
            if (errors[0] != null) {
                throw errors[0];
            }
        }
        finally {
            t.interrupt();
            t.join(1000L);
            Assert.assertFalse((boolean)t.isAlive());
        }
    }
}

