/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.test.utils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

public final class KafkaTestUtils {
    private static final int TEN = 10;
    private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaTestUtils.class));
    private static Properties defaults;

    private KafkaTestUtils() {
    }

    public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka) {
        return KafkaTestUtils.consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
    }

    public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) {
        return KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString());
    }

    public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", brokers);
        props.put("group.id", group);
        props.put("enable.auto.commit", autoCommit);
        props.put("auto.commit.interval.ms", "10");
        props.put("session.timeout.ms", "60000");
        props.put("key.deserializer", IntegerDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        props.put("auto.offset.reset", "earliest");
        return props;
    }

    public static Map<String, Object> producerProps(String brokers) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", brokers);
        props.put("batch.size", "16384");
        props.put("linger.ms", 1);
        props.put("buffer.memory", "33554432");
        props.put("key.serializer", IntegerSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        return props;
    }

    public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) {
        return KafkaTestUtils.getSingleRecord(consumer, topic, 60000L);
    }

    public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, long timeout) {
        ConsumerRecords<K, V> received;
        Iterator iterator;
        long expire = System.currentTimeMillis() + timeout;
        long remaining = timeout;
        do {
            received = KafkaTestUtils.getRecords(consumer, remaining);
            iterator = received.records(topic).iterator();
            HashMap<TopicPartition, Long> reset = new HashMap<TopicPartition, Long>();
            received.forEach(rec -> {
                if (!rec.topic().equals(topic)) {
                    reset.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), tp -> rec.offset());
                }
            });
            reset.forEach((tp, off) -> consumer.seek(tp, off.longValue()));
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            remaining = expire - System.currentTimeMillis();
        } while (!iterator.hasNext() && remaining > 0L);
        if (!iterator.hasNext()) {
            throw new IllegalStateException("No records found for topic");
        }
        iterator.next();
        if (iterator.hasNext()) {
            throw new IllegalStateException("More than one record for topic found");
        }
        return (ConsumerRecord)received.records(topic).iterator().next();
    }

    @Nullable
    public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition, boolean seekToLast, boolean commit, long timeout) {
        Map<String, Object> consumerConfig = KafkaTestUtils.consumerProps(brokerAddresses, group, "false");
        consumerConfig.put("max.poll.records", 1);
        try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig);){
            ConsumerRecords records;
            ConsumerRecord record;
            TopicPartition topicPart = new TopicPartition(topic, partition);
            consumer.assign(Collections.singletonList(topicPart));
            if (seekToLast) {
                consumer.seekToEnd(Collections.singletonList(topicPart));
                if (consumer.position(topicPart) > 0L) {
                    consumer.seek(topicPart, consumer.position(topicPart) - 1L);
                }
            }
            ConsumerRecord consumerRecord = record = (records = consumer.poll(Duration.ofMillis(timeout))).count() == 1 ? (ConsumerRecord)records.iterator().next() : null;
            if (record != null && commit) {
                consumer.commitSync();
            }
            ConsumerRecord consumerRecord2 = record;
            return consumerRecord2;
        }
    }

    public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition) throws Exception {
        try (AdminClient client = AdminClient.create(Collections.singletonMap("bootstrap.servers", brokerAddresses));){
            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata)((Map)client.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get()).get(new TopicPartition(topic, partition));
            return offsetAndMetadata;
        }
    }

    public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition) throws Exception {
        return (OffsetAndMetadata)((Map)adminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get()).get(new TopicPartition(topic, partition));
    }

    public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic, Integer ... partitions) {
        Collection tps;
        if (partitions == null || partitions.length == 0) {
            Map parts = consumer.listTopics(Duration.ofSeconds(10L));
            tps = parts.entrySet().stream().filter(entry -> ((String)entry.getKey()).equals(topic)).flatMap(entry -> ((List)entry.getValue()).stream()).map(pi -> new TopicPartition(topic, pi.partition())).collect(Collectors.toList());
        } else {
            Assert.noNullElements((Object[])partitions, (String)"'partitions' cannot have null elements");
            tps = Arrays.stream(partitions).map(part -> new TopicPartition(topic, part.intValue())).collect(Collectors.toList());
        }
        return consumer.endOffsets(tps, Duration.ofSeconds(10L));
    }

    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) {
        return KafkaTestUtils.getRecords(consumer, 60000L);
    }

    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout) {
        return KafkaTestUtils.getRecords(consumer, timeout, -1);
    }

    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout, int minRecords) {
        long t1;
        ConsumerRecords received;
        logger.debug((CharSequence)"Polling...");
        HashMap records = new HashMap();
        long remaining = timeout;
        int count = 0;
        do {
            t1 = System.currentTimeMillis();
            received = consumer.poll(Duration.ofMillis(remaining));
            logger.debug(() -> "Received: " + received.count() + ", " + received.partitions().stream().flatMap(p -> received.records(p).stream()).map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList()));
            if (received == null) {
                throw new IllegalStateException("null received from consumer.poll()");
            }
            if (minRecords < 0) {
                return received;
            }
            received.partitions().forEach(tp -> {
                List recs = records.computeIfAbsent(tp, part -> new ArrayList());
                recs.addAll(received.records(tp));
            });
        } while ((count += received.count()) < minRecords && (remaining -= System.currentTimeMillis() - t1) > 0L);
        return new ConsumerRecords(records);
    }

    public static Object getPropertyValue(Object root, String propertyPath) {
        Object value = null;
        DirectFieldAccessor accessor = new DirectFieldAccessor(root);
        String[] tokens = propertyPath.split("\\.");
        for (int i = 0; i < tokens.length; ++i) {
            value = accessor.getPropertyValue(tokens[i]);
            if (value == null) {
                if (i == tokens.length - 1) {
                    return null;
                }
                throw new IllegalArgumentException("intermediate property '" + tokens[i] + "' is null");
            }
            accessor = new DirectFieldAccessor(value);
        }
        return value;
    }

    public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type) {
        Object value = KafkaTestUtils.getPropertyValue(root, propertyPath);
        if (value != null) {
            Assert.isAssignable(type, value.getClass());
        }
        return (T)value;
    }

    public static Properties defaultPropertyOverrides() {
        if (defaults == null) {
            Properties props = new Properties();
            props.setProperty("enable.auto.commit", "false");
            defaults = props;
        }
        return defaults;
    }
}

