/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.processor.ConsumerInfo;
import io.micronaut.configuration.kafka.processor.ConsumerState;
import io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor;
import io.micronaut.configuration.kafka.seek.KafkaSeekOperations;
import io.micronaut.configuration.kafka.seek.KafkaSeeker;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.DefaultExecutableBinder;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import reactor.core.publisher.Flux;

@Internal
final class ConsumerStateSingle
extends ConsumerState {
    ConsumerStateSingle(KafkaConsumerProcessor kafkaConsumerProcessor, ConsumerInfo info, Consumer<?, ?> consumer, Object consumerBean) {
        super(kafkaConsumerProcessor, info, consumer, consumerBean);
    }

    @Override
    @Nullable
    protected Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
        return this.info.trackPartitions ? new HashMap() : null;
    }

    @Override
    protected ConsumerRecords<?, ?> pollRecords(@Nullable Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        try {
            return this.kafkaConsumer.poll(this.info.pollTimeout);
        }
        catch (RecordDeserializationException ex) {
            LOG.trace("Kafka consumer [{}] failed to deserialize value while polling", (Object)this.info.logMethod, (Object)ex);
            this.kafkaConsumer.seek(ex.topicPartition(), ex.offset() + 1L);
            this.resolveWithErrorStrategy(null, ConsumerStateSingle.makeConsumerRecord(ex), ex);
            return null;
        }
    }

    @Override
    protected void processRecords(ConsumerRecords<?, ?> consumerRecords, Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        Iterator iterator = consumerRecords.iterator();
        while (iterator.hasNext()) {
            KafkaSeekOperations seek;
            ConsumerRecord consumerRecord;
            block7: {
                consumerRecord = (ConsumerRecord)iterator.next();
                LOG.trace("Kafka consumer [{}] received record: {}", (Object)this.info.logMethod, (Object)consumerRecord);
                if (this.info.trackPartitions) {
                    TopicPartition topicPartition = ConsumerStateSingle.getTopicPartition(consumerRecord);
                    OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1L, null);
                    currentOffsets.put(topicPartition, offsetAndMetadata);
                }
                seek = Optional.ofNullable(this.info.seekArg).map(x -> KafkaSeekOperations.newInstance()).orElse(null);
                Optional.ofNullable(this.info.seekArg).ifPresent(argument -> this.boundArguments.put(argument, seek));
                Optional.ofNullable(this.info.ackArg).ifPresent(argument -> this.boundArguments.put(argument, () -> this.kafkaConsumer.commitSync(currentOffsets)));
                try {
                    this.process(consumerRecord, consumerRecords);
                }
                catch (Exception e) {
                    if (!this.resolveWithErrorStrategy(consumerRecords, consumerRecord, e)) break block7;
                    this.resetTheFollowingPartitions(consumerRecord, iterator);
                    this.failed = true;
                    return;
                }
            }
            if (this.info.offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) {
                this.commitSync(consumerRecords, consumerRecord, currentOffsets);
            } else if (this.info.offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) {
                this.kafkaConsumer.commitAsync(currentOffsets, this::resolveCommitCallback);
            }
            if (seek == null) continue;
            KafkaSeeker seeker = KafkaSeeker.newInstance(this.kafkaConsumer);
            seek.forEach(seeker::perform);
        }
        this.failed = false;
    }

    private void process(ConsumerRecord<?, ?> consumerRecord, ConsumerRecords<?, ?> consumerRecords) {
        DefaultExecutableBinder executableBinder = new DefaultExecutableBinder(this.boundArguments);
        Object result = executableBinder.bind(this.info.method, (ArgumentBinderRegistry)this.kafkaConsumerProcessor.getBinderRegistry(), consumerRecord).invoke(this.consumerBean);
        if (result != null) {
            boolean isPublisher = Publishers.isConvertibleToPublisher((Object)result);
            Flux<Object> publisher = isPublisher ? this.kafkaConsumerProcessor.convertPublisher(result) : Flux.just((Object)result);
            this.handleResultFlux(consumerRecords, consumerRecord, publisher, isPublisher || this.info.isBlocking);
        }
    }

    private boolean resolveWithErrorStrategy(@Nullable ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord, Throwable e) {
        if (this.info.errorStrategy.isRetry()) {
            int currentRetryCount;
            TopicPartition topicPartition = ConsumerStateSingle.getTopicPartition(consumerRecord);
            if (this.shouldRetryException(e, consumerRecords, consumerRecord) && this.info.retryCount > 0 && this.info.retryCount >= (currentRetryCount = this.getCurrentRetryCount(consumerRecord))) {
                if (this.info.shouldHandleAllExceptions) {
                    this.handleException(e, consumerRecords, consumerRecord);
                }
                this.kafkaConsumer.seek(topicPartition, consumerRecord.offset());
                this.delayRetry(currentRetryCount, Collections.singleton(topicPartition));
                return true;
            }
            this.topicPartitionRetries.remove(topicPartition);
        }
        this.handleException(e, consumerRecords, consumerRecord);
        return this.info.errorStrategy == ErrorStrategyValue.NONE;
    }

    private void commitSync(ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord, Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        try {
            this.kafkaConsumer.commitSync(currentOffsets);
        }
        catch (CommitFailedException e) {
            this.handleException(e, consumerRecords, consumerRecord);
        }
    }

    private void resolveCommitCallback(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        Object object = this.consumerBean;
        if (object instanceof OffsetCommitCallback) {
            OffsetCommitCallback occ = (OffsetCommitCallback)object;
            occ.onComplete(offsets, exception);
        } else if (exception != null) {
            LOG.error("Error asynchronously committing Kafka offsets [{}]: {}", new Object[]{offsets, exception.getMessage(), exception});
        }
    }

    private void resetTheFollowingPartitions(ConsumerRecord<?, ?> errorConsumerRecord, Iterator<? extends ConsumerRecord<?, ?>> iterator) {
        HashSet<Integer> processedPartition = new HashSet<Integer>();
        processedPartition.add(errorConsumerRecord.partition());
        while (iterator.hasNext()) {
            ConsumerRecord<?, ?> consumerRecord = iterator.next();
            if (!processedPartition.add(consumerRecord.partition())) continue;
            this.kafkaConsumer.seek(ConsumerStateSingle.getTopicPartition(consumerRecord), consumerRecord.offset());
        }
    }

    private int getCurrentRetryCount(ConsumerRecord<?, ?> consumerRecord) {
        return this.getPartitionRetryState((TopicPartition)ConsumerStateSingle.getTopicPartition(consumerRecord), (long)consumerRecord.offset()).currentRetryCount;
    }

    private static TopicPartition getTopicPartition(ConsumerRecord<?, ?> consumerRecord) {
        return new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
    }

    private static ConsumerRecord<?, ?> makeConsumerRecord(RecordDeserializationException ex) {
        TopicPartition tp = ex.topicPartition();
        return new ConsumerRecord(tp.topic(), tp.partition(), ex.offset(), null, null);
    }
}

