/*
 * Decompiled with CFR 0.152.
 */
package net.mguenther.kafka.junit.provider;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.KeyValueMetadata;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.RecordConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultRecordConsumer
implements RecordConsumer {
    private static final Logger log = LoggerFactory.getLogger(DefaultRecordConsumer.class);
    private final String bootstrapServers;

    @Override
    public <V> List<V> readValues(ReadKeyValues<String, V> readRequest) throws InterruptedException {
        List<KeyValue<String, V>> kvs = this.read(readRequest);
        return Collections.unmodifiableList(kvs.stream().map(KeyValue::getValue).collect(Collectors.toList()));
    }

    @Override
    public <K, V> List<KeyValue<K, V>> read(ReadKeyValues<K, V> readRequest) throws InterruptedException {
        ArrayList<KeyValue<Object, Object>> consumedRecords = new ArrayList<KeyValue<Object, Object>>();
        KafkaConsumer consumer = new KafkaConsumer(this.effectiveConsumerProps(readRequest.getConsumerProps()));
        int pollIntervalMillis = 100;
        int limit = readRequest.getLimit();
        Predicate<K> filterOnKeys = readRequest.getFilterOnKeys();
        Predicate<V> filterOnValues = readRequest.getFilterOnValues();
        Predicate<Headers> filterOnHeaders = readRequest.getFilterOnHeaders();
        consumer.subscribe(Collections.singletonList(readRequest.getTopic()));
        int totalPollTimeMillis = 0;
        boolean assignmentsReady = false;
        while (totalPollTimeMillis < readRequest.getMaxTotalPollTimeMillis() && DefaultRecordConsumer.continueConsuming(consumedRecords.size(), limit)) {
            ConsumerRecords records;
            block5: {
                records = consumer.poll(Duration.ofMillis(100L));
                try {
                    if (assignmentsReady) break block5;
                    assignmentsReady = true;
                    if (this.seekIfNecessary(readRequest, consumer)) {
                    }
                    break block5;
                }
                catch (IllegalStateException e) {
                    assignmentsReady = false;
                }
                continue;
            }
            for (ConsumerRecord record : records) {
                if (!filterOnKeys.test(record.key()) || !filterOnValues.test(record.value()) || !filterOnHeaders.test(record.headers())) continue;
                KeyValue<Object, Object> kv = readRequest.isIncludeMetadata() ? new KeyValue<Object, Object>(record.key(), record.value(), record.headers(), new KeyValueMetadata(record.topic(), record.partition(), record.offset())) : new KeyValue<Object, Object>(record.key(), record.value(), record.headers());
                consumedRecords.add(kv);
            }
            totalPollTimeMillis += 100;
        }
        consumer.commitSync();
        consumer.close();
        return Collections.unmodifiableList(consumedRecords);
    }

    private <K, V> boolean seekIfNecessary(ReadKeyValues<K, V> readRequest, KafkaConsumer<K, V> consumer) {
        boolean shouldSeek;
        boolean bl = shouldSeek = readRequest.getSeekTo().size() > 0;
        if (shouldSeek) {
            readRequest.getSeekTo().keySet().stream().map(partition -> new TopicPartition(readRequest.getTopic(), partition.intValue())).peek(topicPartition -> log.info("Seeking to offset {} of topic-partition {}.", (Object)readRequest.getSeekTo().get(topicPartition.partition()), topicPartition)).forEach(topicPartition -> consumer.seek(topicPartition, readRequest.getSeekTo().get(topicPartition.partition()).longValue()));
        }
        return shouldSeek;
    }

    private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
        return maxMessages <= 0 || messagesConsumed < maxMessages;
    }

    @Override
    public <V> List<V> observeValues(ObserveKeyValues<String, V> observeRequest) throws InterruptedException {
        ArrayList<V> totalConsumedRecords = new ArrayList<V>(observeRequest.getExpected());
        long startNanos = System.nanoTime();
        ReadKeyValues<String, V> initialReadRequest = this.withPartitionSeekForReadValues(observeRequest);
        ReadKeyValues<String, V> subsequentReadRequests = this.withoutPartitionSeekForReadValues(observeRequest);
        boolean firstRequest = true;
        while (true) {
            List<V> consumedRecords = firstRequest ? this.readValues(initialReadRequest) : this.readValues(subsequentReadRequests);
            totalConsumedRecords.addAll(consumedRecords);
            if (firstRequest) {
                firstRequest = false;
            }
            if (totalConsumedRecords.size() >= observeRequest.getExpected()) break;
            if (System.nanoTime() > startNanos + TimeUnit.MILLISECONDS.toNanos(observeRequest.getObservationTimeMillis())) {
                String message = String.format("Expected %s records, but consumed only %s records before ran into timeout (%s ms).", observeRequest.getExpected(), totalConsumedRecords.size(), observeRequest.getObservationTimeMillis());
                throw new AssertionError((Object)message);
            }
            Thread.sleep(Math.min(observeRequest.getObservationTimeMillis(), 100));
        }
        return Collections.unmodifiableList(totalConsumedRecords);
    }

    private <V> ReadKeyValues<String, V> withPartitionSeekForReadValues(ObserveKeyValues<String, V> observeRequest) {
        return this.toReadValuesRequest(observeRequest).seekTo(observeRequest.getSeekTo()).build();
    }

    private <V> ReadKeyValues<String, V> withoutPartitionSeekForReadValues(ObserveKeyValues<String, V> observeRequest) {
        return this.toReadValuesRequest(observeRequest).build();
    }

    private <V> ReadKeyValues.ReadKeyValuesBuilder<String, V> toReadValuesRequest(ObserveKeyValues<String, V> observeRequest) {
        return ReadKeyValues.from(observeRequest.getTopic(), observeRequest.getClazzOfV()).withAll(observeRequest.getConsumerProps()).withLimit(observeRequest.getExpected()).withMetadata(false).filterOnKeys(observeRequest.getFilterOnKeys()).filterOnValues(observeRequest.getFilterOnValues()).filterOnHeaders(observeRequest.getFilterOnHeaders()).with("group.id", observeRequest.getConsumerProps().getProperty("group.id"));
    }

    @Override
    public <K, V> List<KeyValue<K, V>> observe(ObserveKeyValues<K, V> observeRequest) throws InterruptedException {
        ArrayList<KeyValue<K, V>> totalConsumedRecords = new ArrayList<KeyValue<K, V>>(observeRequest.getExpected());
        long startNanos = System.nanoTime();
        ReadKeyValues<K, V> initialReadRequest = this.withPartitionSeek(observeRequest);
        ReadKeyValues<K, V> subsequentReadRequests = this.withoutPartitionSeek(observeRequest);
        boolean firstRequest = true;
        while (true) {
            List<KeyValue<K, V>> consumedRecords = firstRequest ? this.read(initialReadRequest) : this.read(subsequentReadRequests);
            totalConsumedRecords.addAll(consumedRecords);
            if (firstRequest) {
                firstRequest = false;
            }
            if (totalConsumedRecords.size() >= observeRequest.getExpected()) break;
            if (System.nanoTime() > startNanos + TimeUnit.MILLISECONDS.toNanos(observeRequest.getObservationTimeMillis())) {
                String message = String.format("Expected %s records, but consumed only %s records before ran into timeout (%s ms).", observeRequest.getExpected(), totalConsumedRecords.size(), observeRequest.getObservationTimeMillis());
                throw new AssertionError((Object)message);
            }
            Thread.sleep(Math.min(observeRequest.getObservationTimeMillis(), 100));
        }
        return Collections.unmodifiableList(totalConsumedRecords);
    }

    private <K, V> ReadKeyValues<K, V> withPartitionSeek(ObserveKeyValues<K, V> observeRequest) {
        return this.toReadKeyValuesRequest(observeRequest).seekTo(observeRequest.getSeekTo()).build();
    }

    private <K, V> ReadKeyValues<K, V> withoutPartitionSeek(ObserveKeyValues<K, V> observeRequest) {
        return this.toReadKeyValuesRequest(observeRequest).build();
    }

    private <K, V> ReadKeyValues.ReadKeyValuesBuilder<K, V> toReadKeyValuesRequest(ObserveKeyValues<K, V> observeRequest) {
        return ReadKeyValues.from(observeRequest.getTopic(), observeRequest.getClazzOfK(), observeRequest.getClazzOfV()).withAll(observeRequest.getConsumerProps()).withLimit(observeRequest.getExpected()).withMetadata(observeRequest.isIncludeMetadata()).filterOnKeys(observeRequest.getFilterOnKeys()).filterOnValues(observeRequest.getFilterOnValues()).filterOnHeaders(observeRequest.getFilterOnHeaders()).with("group.id", observeRequest.getConsumerProps().getProperty("group.id"));
    }

    private Properties effectiveConsumerProps(Properties originalConsumerProps) {
        Properties effectiveConsumerProps = new Properties();
        effectiveConsumerProps.putAll((Map<?, ?>)originalConsumerProps);
        effectiveConsumerProps.put("bootstrap.servers", this.bootstrapServers);
        return effectiveConsumerProps;
    }

    public DefaultRecordConsumer(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }
}

