/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class NegativeAcksTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(NegativeAcksTest.class);

    @Override
    @BeforeClass
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="variations")
    public static Object[][] variations() {
        return new Object[][]{{false, false, SubscriptionType.Shared, 100, 0}, {false, false, SubscriptionType.Failover, 100, 0}, {false, true, SubscriptionType.Shared, 100, 0}, {false, true, SubscriptionType.Failover, 100, 0}, {true, false, SubscriptionType.Shared, 100, 0}, {true, false, SubscriptionType.Failover, 100, 0}, {true, true, SubscriptionType.Shared, 100, 0}, {true, true, SubscriptionType.Failover, 100, 0}, {false, false, SubscriptionType.Shared, 0, 0}, {false, false, SubscriptionType.Failover, 0, 0}, {false, true, SubscriptionType.Shared, 0, 0}, {false, true, SubscriptionType.Failover, 0, 0}, {true, false, SubscriptionType.Shared, 0, 0}, {true, false, SubscriptionType.Failover, 0, 0}, {true, true, SubscriptionType.Shared, 0, 0}, {true, true, SubscriptionType.Failover, 0, 0}, {false, false, SubscriptionType.Shared, 100, 1000}, {false, false, SubscriptionType.Failover, 100, 1000}, {false, true, SubscriptionType.Shared, 100, 1000}, {false, true, SubscriptionType.Failover, 100, 1000}, {true, false, SubscriptionType.Shared, 100, 1000}, {true, false, SubscriptionType.Failover, 100, 1000}, {true, true, SubscriptionType.Shared, 100, 1000}, {true, true, SubscriptionType.Failover, 100, 1000}, {false, false, SubscriptionType.Shared, 0, 1000}, {false, false, SubscriptionType.Failover, 0, 1000}, {false, true, SubscriptionType.Shared, 0, 1000}, {false, true, SubscriptionType.Failover, 0, 1000}, {true, false, SubscriptionType.Shared, 0, 1000}, {true, false, SubscriptionType.Failover, 0, 1000}, {true, true, SubscriptionType.Shared, 0, 1000}, {true, true, SubscriptionType.Failover, 0, 1000}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="variations")
    public void testNegativeAcks(boolean batching, boolean usePartitions, SubscriptionType subscriptionType, int negAcksDelayMillis, int ackTimeout) throws Exception {
        log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}", new Object[]{batching, usePartitions, subscriptionType, negAcksDelayMillis});
        String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("sub1").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionType(subscriptionType).negativeAckRedeliveryDelay((long)negAcksDelayMillis, TimeUnit.MILLISECONDS).ackTimeout((long)ackTimeout, TimeUnit.MILLISECONDS).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(batching).create();
            try {
                int i;
                HashSet<String> sentMessages = new HashSet<String>();
                int N = 10;
                for (i = 0; i < 10; ++i) {
                    String value = "test-" + i;
                    producer.sendAsync((Object)value);
                    sentMessages.add(value);
                }
                producer.flush();
                for (i = 0; i < 10; ++i) {
                    Message msg = consumer.receive();
                    consumer.negativeAcknowledge(msg);
                }
                HashSet<String> receivedMessages = new HashSet<String>();
                for (int i2 = 0; i2 < 10; ++i2) {
                    Message msg = consumer.receive();
                    receivedMessages.add((String)msg.getValue());
                    consumer.acknowledge(msg);
                }
                Assert.assertEquals(receivedMessages, sentMessages);
                Assert.assertNull((Object)consumer.receive(100, TimeUnit.MILLISECONDS));
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }
}

