/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.tests.integration.amqp;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.JMSClientTestSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Enclosed.class)
public class AmqpFlowControlFailTest {
    private static void configureAddressPolicy(ActiveMQServer server) {
        AddressSettings addressSettings = (AddressSettings)server.getAddressSettingsRepository().getMatch("#");
        addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
        addressSettings.setMaxSizeBytes(1000L);
        server.getAddressSettingsRepository().addMatch("#", (Object)addressSettings);
    }

    public static class AmqpFlowControlFailOrdinaryTests
    extends JMSClientTestSupport {
        @Override
        protected void configureAddressPolicy(ActiveMQServer server) {
            AmqpFlowControlFailTest.configureAddressPolicy(server);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Test(timeout=60000L)
        public void testMesagesNotSent() throws Exception {
            AmqpClient client = this.createAmqpClient(this.getBrokerAmqpConnectionURI());
            int messagesSent = 0;
            try (AmqpConnection connection = this.addConnection(client.connect());){
                int i;
                AmqpSession session = connection.createSession();
                AmqpSender sender = session.createSender(this.getQueueName());
                boolean rejected = false;
                for (int i2 = 0; i2 < 1000; ++i2) {
                    AmqpMessage message = new AmqpMessage();
                    byte[] payload = new byte[10];
                    message.setBytes(payload);
                    try {
                        sender.send(message);
                        ++messagesSent;
                        continue;
                    }
                    catch (IOException e) {
                        rejected = true;
                    }
                }
                AmqpFlowControlFailOrdinaryTests.assertTrue((boolean)rejected);
                rejected = false;
                AmqpFlowControlFailOrdinaryTests.assertEquals((long)0L, (long)sender.getSender().getCredit());
                AmqpSession session2 = connection.createSession();
                AmqpReceiver receiver = session2.createReceiver(this.getQueueName());
                receiver.flow(messagesSent);
                for (i = 0; i < messagesSent; ++i) {
                    AmqpMessage receive = receiver.receive();
                    receive.accept();
                }
                receiver.close();
                session2.close();
                Wait.assertEquals((int)1000, () -> ((Sender)sender.getSender()).getCredit());
                for (i = 0; i < 1000; ++i) {
                    AmqpMessage message = new AmqpMessage();
                    byte[] payload = new byte[100];
                    message.setBytes(payload);
                    try {
                        sender.send(message);
                        continue;
                    }
                    catch (IOException e) {
                        rejected = true;
                    }
                }
                AmqpFlowControlFailOrdinaryTests.assertTrue((boolean)rejected);
                AmqpFlowControlFailOrdinaryTests.assertEquals((long)0L, (long)sender.getSender().getCredit());
            }
        }
    }

    @RunWith(value=Parameterized.class)
    public static class AmqpFlowControlFailDispositionTests
    extends JMSClientTestSupport {
        @Parameterized.Parameter
        public boolean useModified;
        @Parameterized.Parameter(value=1)
        public Symbol[] outcomes;
        @Parameterized.Parameter(value=2)
        public String expectedMessage;

        @Parameterized.Parameters(name="useModified={0}")
        public static Collection<Object[]> parameters() {
            return Arrays.asList({true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "failure at remote"}, {true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"}, {false, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"}, {false, new Symbol[0], "[condition = amqp:resource-limit-exceeded]"});
        }

        @Override
        protected void configureAddressPolicy(ActiveMQServer server) {
            AmqpFlowControlFailTest.configureAddressPolicy(server);
        }

        @Override
        protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
            params.put("amqpUseModifiedForTransientDeliveryErrors", this.useModified);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Test(timeout=60000L)
        public void testAddressFullDisposition() throws Exception {
            AmqpClient client = this.createAmqpClient(this.getBrokerAmqpConnectionURI());
            try (AmqpConnection connection = this.addConnection(client.connect());){
                AmqpSession session = connection.createSession();
                AmqpSender sender = session.createSender(this.getQueueName(), null, null, this.outcomes);
                boolean rejected = false;
                for (int i = 0; i < 1000; ++i) {
                    AmqpMessage message = new AmqpMessage();
                    byte[] payload = new byte[10];
                    message.setBytes(payload);
                    try {
                        sender.send(message);
                        continue;
                    }
                    catch (IOException e) {
                        rejected = true;
                        AmqpFlowControlFailDispositionTests.assertTrue((String)String.format("Unexpected message expected %s to contain %s", e.getMessage(), this.expectedMessage), (boolean)e.getMessage().contains(this.expectedMessage));
                    }
                }
                AmqpFlowControlFailDispositionTests.assertTrue((String)"Expected messages to be refused by broker", (boolean)rejected);
            }
        }
    }
}

