/*
 * Decompiled with CFR 0.152.
 */
package kafka.clients.consumer;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.junit.jupiter.api.Assertions;

public class ConsumerIntegrationTest {
    @ClusterTests(value={@ClusterTest(serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1"), @ClusterConfigProperty(key="group.coordinator.new.enable", value="false")}), @ClusterTest(serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1"), @ClusterConfigProperty(key="group.coordinator.rebalance.protocols", value="classic")})})
    public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInstance) throws Exception {
        String topic = "test-topic";
        clusterInstance.createTopic(topic, 1, (short)1);
        try (KafkaConsumer consumer = new KafkaConsumer(Map.of("bootstrap.servers", clusterInstance.bootstrapServers(), "group.id", "test-group", "key.deserializer", StringDeserializer.class.getName(), "value.deserializer", StringDeserializer.class.getName(), "group.protocol", GroupProtocol.CONSUMER.name()));){
            consumer.subscribe(Collections.singletonList(topic));
            TestUtils.waitForCondition(() -> {
                try {
                    consumer.poll(Duration.ofMillis(1000L));
                    return false;
                }
                catch (UnsupportedVersionException e) {
                    return e.getMessage().equals("The cluster does not support the new CONSUMER group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol until the cluster is upgraded.");
                }
            }, (String)"Should get UnsupportedVersionException and how to revert to classic protocol");
        }
    }

    @ClusterTest(serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1")})
    public void testFetchPartitionsAfterFailedListenerWithGroupProtocolClassic(ClusterInstance clusterInstance) throws InterruptedException {
        ConsumerIntegrationTest.testFetchPartitionsAfterFailedListener(clusterInstance, GroupProtocol.CLASSIC);
    }

    @ClusterTest(serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1")})
    public void testFetchPartitionsAfterFailedListenerWithGroupProtocolConsumer(ClusterInstance clusterInstance) throws InterruptedException {
        ConsumerIntegrationTest.testFetchPartitionsAfterFailedListener(clusterInstance, GroupProtocol.CONSUMER);
    }

    private static void testFetchPartitionsAfterFailedListener(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException {
        String topic = "topic";
        try (Producer producer = clusterInstance.producer(Map.of("key.serializer", ByteArraySerializer.class, "value.serializer", ByteArraySerializer.class));){
            producer.send(new ProducerRecord(topic, (Object)"key".getBytes(), (Object)"value".getBytes()));
        }
        try (Consumer consumer = clusterInstance.consumer(Map.of("group.protocol", groupProtocol.name()));){
            consumer.subscribe(List.of(topic), new ConsumerRebalanceListener(){
                private int count = 0;

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    ++this.count;
                    if (this.count == 1) {
                        throw new IllegalArgumentException("temporary error");
                    }
                }
            });
            TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1L)).count() == 1, (long)5000L, (String)"failed to poll data");
        }
    }

    @ClusterTest(serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1")})
    public void testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolClassic(ClusterInstance clusterInstance) throws InterruptedException {
        ConsumerIntegrationTest.testFetchPartitionsWithAlwaysFailedListener(clusterInstance, GroupProtocol.CLASSIC);
    }

    @ClusterTest(serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1")})
    public void testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolConsumer(ClusterInstance clusterInstance) throws InterruptedException {
        ConsumerIntegrationTest.testFetchPartitionsWithAlwaysFailedListener(clusterInstance, GroupProtocol.CONSUMER);
    }

    private static void testFetchPartitionsWithAlwaysFailedListener(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException {
        String topic = "topic";
        try (Producer producer = clusterInstance.producer(Map.of("key.serializer", ByteArraySerializer.class, "value.serializer", ByteArraySerializer.class));){
            producer.send(new ProducerRecord(topic, (Object)"key".getBytes(), (Object)"value".getBytes()));
        }
        try (Consumer consumer = clusterInstance.consumer(Map.of("group.protocol", groupProtocol.name()));){
            consumer.subscribe(List.of(topic), new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    throw new IllegalArgumentException("always failed");
                }
            });
            long startTimeMillis = System.currentTimeMillis();
            long currentTimeMillis = System.currentTimeMillis();
            while (currentTimeMillis < startTimeMillis + 3000L) {
                currentTimeMillis = System.currentTimeMillis();
                try {
                    Assertions.assertEquals((int)0, (int)consumer.poll(Duration.ofSeconds(1L)).count());
                }
                catch (KafkaException ex) {
                    Assertions.assertEquals((Object)"User rebalance callback throws an error", (Object)ex.getMessage());
                }
                Thread.sleep(300L);
            }
        }
    }
}

