/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue;
import io.micronaut.configuration.kafka.processor.ConsumerInfo;
import io.micronaut.configuration.kafka.processor.ConsumerState;
import io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.DefaultExecutableBinder;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import reactor.core.publisher.Flux;

@Internal
final class ConsumerStateBatch
extends ConsumerState {
    ConsumerStateBatch(KafkaConsumerProcessor kafkaConsumerProcessor, ConsumerInfo info, Consumer<?, ?> consumer, Object consumerBean) {
        super(kafkaConsumerProcessor, info, consumer, consumerBean);
    }

    @Override
    @Nullable
    protected Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
        return this.info.errorStrategy.isRetry() ? this.kafkaConsumer.assignment().stream().collect(Collectors.toMap(Function.identity(), this::getCurrentOffset)) : null;
    }

    @Override
    protected ConsumerRecords<?, ?> pollRecords(@Nullable Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        try {
            return this.kafkaConsumer.poll(this.info.pollTimeout);
        }
        catch (RecordDeserializationException ex) {
            LOG.trace("Kafka consumer [{}] failed to deserialize value while polling", (Object)this.info.logMethod, (Object)ex);
            this.kafkaConsumer.seek(ex.topicPartition(), ex.offset() + 1L);
            this.resolveWithErrorStrategy(null, currentOffsets, ex);
            return null;
        }
    }

    @Override
    protected void processRecords(ConsumerRecords<?, ?> consumerRecords, @Nullable Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        try {
            if (this.info.ackArg != null) {
                Map<TopicPartition, OffsetAndMetadata> batchOffsets = this.getAckOffsets(consumerRecords);
                this.boundArguments.put(this.info.ackArg, () -> this.kafkaConsumer.commitSync(batchOffsets));
            }
            DefaultExecutableBinder batchBinder = new DefaultExecutableBinder(this.boundArguments);
            Object result = batchBinder.bind(this.info.method, (ArgumentBinderRegistry)this.kafkaConsumerProcessor.getBatchBinderRegistry(), consumerRecords).invoke(this.consumerBean);
            this.handleResult(ConsumerStateBatch.normalizeResult(result), consumerRecords);
            this.failed = false;
        }
        catch (Exception e) {
            this.failed = this.resolveWithErrorStrategy(consumerRecords, currentOffsets, e);
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> getAckOffsets(ConsumerRecords<?, ?> consumerRecords) {
        HashMap<TopicPartition, OffsetAndMetadata> ackOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (ConsumerRecord consumerRecord : consumerRecords) {
            TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1L, null);
            ackOffsets.put(topicPartition, offsetAndMetadata);
        }
        return ackOffsets;
    }

    @Nullable
    private static Object normalizeResult(@Nullable Object result) {
        if (result != null && result.getClass().isArray()) {
            return Arrays.asList((Object[])result);
        }
        return result;
    }

    private void handleResult(Object result, ConsumerRecords<?, ?> consumerRecords) {
        if (result != null) {
            Flux<Object> resultFlux;
            boolean isBlocking;
            boolean isPublisher = Publishers.isConvertibleToPublisher((Object)result);
            boolean bl = isBlocking = this.info.isBlocking || !isPublisher;
            if (result instanceof Iterable) {
                Iterable iterable = (Iterable)result;
                resultFlux = Flux.fromIterable((Iterable)iterable);
            } else {
                resultFlux = isPublisher ? this.kafkaConsumerProcessor.convertPublisher(result) : Flux.just((Object)result);
            }
            Flux resultRecordFlux = resultFlux.zipWithIterable(consumerRecords).doOnNext(x -> this.handleResultFlux(consumerRecords, (ConsumerRecord)x.getT2(), Flux.just((Object)x.getT1()), isBlocking));
            if (isBlocking) {
                resultRecordFlux.blockLast();
            } else {
                resultRecordFlux.subscribe();
            }
        }
    }

    private boolean resolveWithErrorStrategy(@Nullable ConsumerRecords<?, ?> consumerRecords, Map<TopicPartition, OffsetAndMetadata> currentOffsets, Throwable e) {
        if (this.info.errorStrategy.isRetry()) {
            int currentRetryCount;
            Set<TopicPartition> partitions;
            Set<TopicPartition> set = partitions = consumerRecords != null ? consumerRecords.partitions() : currentOffsets.keySet();
            if (this.shouldRetryException(e, consumerRecords, null) && this.info.retryCount > 0 && this.info.retryCount >= (currentRetryCount = this.getCurrentRetryCount(partitions, currentOffsets))) {
                if (this.info.shouldHandleAllExceptions) {
                    this.handleException(e, consumerRecords, null);
                }
                partitions.forEach(tp -> this.kafkaConsumer.seek(tp, ((OffsetAndMetadata)currentOffsets.get(tp)).offset()));
                this.delayRetry(currentRetryCount, partitions);
                return true;
            }
            partitions.forEach(this.topicPartitionRetries::remove);
        }
        this.handleException(e, consumerRecords, null);
        return this.info.errorStrategy == ErrorStrategyValue.NONE;
    }

    private int getCurrentRetryCount(Set<TopicPartition> partitions, @Nullable Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        return partitions.stream().map(tp -> this.getPartitionRetryState((TopicPartition)tp, ((OffsetAndMetadata)currentOffsets.get(tp)).offset())).mapToInt(x -> x.currentRetryCount).max().orElse(this.info.retryCount);
    }

    private OffsetAndMetadata getCurrentOffset(TopicPartition tp) {
        return new OffsetAndMetadata(this.kafkaConsumer.position(tp), null);
    }
}

