/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.ConsumerRegistry;
import io.micronaut.configuration.kafka.ConsumerSeekAware;
import io.micronaut.configuration.kafka.ProducerRegistry;
import io.micronaut.configuration.kafka.TransactionalProducerRegistry;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.bind.ConsumerRecordBinderRegistry;
import io.micronaut.configuration.kafka.bind.batch.BatchConsumerRecordsBinderRegistry;
import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.event.KafkaConsumerStartedPollingEvent;
import io.micronaut.configuration.kafka.event.KafkaConsumerSubscribedEvent;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.processor.ConsumerInfo;
import io.micronaut.configuration.kafka.processor.ConsumerSeekAwareAdapter;
import io.micronaut.configuration.kafka.processor.ConsumerState;
import io.micronaut.configuration.kafka.processor.ConsumerStateBatch;
import io.micronaut.configuration.kafka.processor.ConsumerStateSingle;
import io.micronaut.configuration.kafka.retry.ConditionalRetryBehaviourHandler;
import io.micronaut.configuration.kafka.seek.KafkaSeeker;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.ArrayUtils;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.exceptions.MessagingSystemException;
import io.micronaut.runtime.ApplicationConfiguration;
import io.micronaut.scheduling.ScheduledExecutorTaskScheduler;
import io.micronaut.scheduling.TaskScheduler;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

@Singleton
@Requires(beans={KafkaDefaultConfiguration.class})
@Internal
class KafkaConsumerProcessor
implements ExecutableMethodProcessor<Topic>,
AutoCloseable,
ConsumerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
    private static final ByteArrayDeserializer DEFAULT_KEY_DESERIALIZER = new ByteArrayDeserializer();
    private static final StringDeserializer DEFAULT_VALUE_DESERIALIZER = new StringDeserializer();
    private final ExecutorService executorService;
    private final ApplicationConfiguration applicationConfiguration;
    private final BeanContext beanContext;
    private final AbstractKafkaConsumerConfiguration defaultConsumerConfiguration;
    private final Map<String, ConsumerState> consumers = new ConcurrentHashMap<String, ConsumerState>();
    private final ConsumerRecordBinderRegistry binderRegistry;
    private final SerdeRegistry serdeRegistry;
    private final KafkaListenerExceptionHandler exceptionHandler;
    private final TaskScheduler taskScheduler;
    private final ProducerRegistry producerRegistry;
    private final TransactionalProducerRegistry transactionalProducerRegistry;
    private final BatchConsumerRecordsBinderRegistry batchBinderRegistry;
    private final AtomicInteger clientIdGenerator = new AtomicInteger(10);
    private final ApplicationEventPublisher<KafkaConsumerStartedPollingEvent> kafkaConsumerStartedPollingEventPublisher;
    private final ApplicationEventPublisher<KafkaConsumerSubscribedEvent> kafkaConsumerSubscribedEventPublisher;
    private final ConditionalRetryBehaviourHandler conditionalRetryBehaviourHandler;

    KafkaConsumerProcessor(@Named(value="consumer") ExecutorService executorService, ApplicationConfiguration applicationConfiguration, BeanContext beanContext, AbstractKafkaConsumerConfiguration defaultConsumerConfiguration, ConsumerRecordBinderRegistry binderRegistry, BatchConsumerRecordsBinderRegistry batchBinderRegistry, SerdeRegistry serdeRegistry, ProducerRegistry producerRegistry, KafkaListenerExceptionHandler exceptionHandler, @Named(value="scheduled") ExecutorService schedulerService, TransactionalProducerRegistry transactionalProducerRegistry, ApplicationEventPublisher<KafkaConsumerStartedPollingEvent> startedEventPublisher, ApplicationEventPublisher<KafkaConsumerSubscribedEvent> subscribedEventPublisher, ConditionalRetryBehaviourHandler conditionalRetryBehaviourHandler) {
        this.executorService = executorService;
        this.applicationConfiguration = applicationConfiguration;
        this.beanContext = beanContext;
        this.defaultConsumerConfiguration = defaultConsumerConfiguration;
        this.binderRegistry = binderRegistry;
        this.batchBinderRegistry = batchBinderRegistry;
        this.serdeRegistry = serdeRegistry;
        this.producerRegistry = producerRegistry;
        this.exceptionHandler = exceptionHandler;
        this.taskScheduler = new ScheduledExecutorTaskScheduler(schedulerService);
        this.transactionalProducerRegistry = transactionalProducerRegistry;
        this.kafkaConsumerStartedPollingEventPublisher = startedEventPublisher;
        this.kafkaConsumerSubscribedEventPublisher = subscribedEventPublisher;
        this.conditionalRetryBehaviourHandler = conditionalRetryBehaviourHandler;
        this.beanContext.getBeanDefinitions(Qualifiers.byType((Class[])new Class[]{KafkaListener.class})).forEach(definition -> {
            if (definition.isSingleton()) {
                try {
                    beanContext.getBean(definition.getBeanType());
                }
                catch (Exception e) {
                    throw new MessagingSystemException("Error creating bean for @KafkaListener of type [" + definition.getBeanType() + "]: " + e.getMessage(), (Throwable)e);
                }
            }
        });
    }

    @NonNull
    private ConsumerState getConsumerState(@NonNull String id) {
        ConsumerState consumerState = this.consumers.get(id);
        if (consumerState == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        return consumerState;
    }

    @Override
    @NonNull
    public <K, V> Consumer<K, V> getConsumer(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Consumer<?, ?> consumer = this.getConsumerState((String)id).kafkaConsumer;
        if (consumer == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        return consumer;
    }

    @Override
    @NonNull
    public Set<String> getConsumerSubscription(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Set<String> subscriptions = this.getConsumerState((String)id).subscriptions;
        if (subscriptions == null || subscriptions.isEmpty()) {
            throw new IllegalArgumentException("No consumer subscription found for ID: " + id);
        }
        return subscriptions;
    }

    @Override
    @NonNull
    public Set<TopicPartition> getConsumerAssignment(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Set<TopicPartition> assignment = this.getConsumerState((String)id).assignments;
        if (assignment == null || assignment.isEmpty()) {
            throw new IllegalArgumentException("No consumer assignment found for ID: " + id);
        }
        return assignment;
    }

    @Override
    @NonNull
    public Set<String> getConsumerIds() {
        return Collections.unmodifiableSet(this.consumers.keySet());
    }

    @Override
    public boolean isPaused(@NonNull String id) {
        return this.isPaused(id, this.getConsumerState((String)id).assignments);
    }

    @Override
    public boolean isPaused(@NonNull String id, @NonNull Collection<TopicPartition> topicPartitions) {
        return this.getConsumerState(id).isPaused(topicPartitions);
    }

    @Override
    public void pause(@NonNull String id) {
        this.getConsumerState(id).pause();
    }

    @Override
    public void pause(@NonNull String id, @NonNull Collection<TopicPartition> topicPartitions) {
        this.getConsumerState(id).pause(topicPartitions);
    }

    @Override
    public void resume(@NonNull String id) {
        this.getConsumerState(id).resume();
    }

    @Override
    public void resume(@NonNull String id, @NonNull Collection<TopicPartition> topicPartitions) {
        this.getConsumerState(id).resume(topicPartitions);
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        List topicAnnotations = method.getDeclaredAnnotationValuesByType(Topic.class);
        AnnotationValue consumerAnnotation = method.getAnnotation(KafkaListener.class);
        if (CollectionUtils.isEmpty((Collection)topicAnnotations)) {
            topicAnnotations = beanDefinition.getDeclaredAnnotationValuesByType(Topic.class);
        }
        if (consumerAnnotation == null || CollectionUtils.isEmpty((Collection)topicAnnotations)) {
            return;
        }
        Class beanType = beanDefinition.getBeanType();
        Object groupId = consumerAnnotation.stringValue("groupId").filter(StringUtils::isNotEmpty).orElseGet(() -> this.applicationConfiguration.getName().orElse(beanType.getName()));
        String clientId = consumerAnnotation.stringValue("clientId").filter(StringUtils::isNotEmpty).orElseGet(() -> this.applicationConfiguration.getName().map(s -> s + "-" + NameUtils.hyphenate((String)beanType.getSimpleName())).orElse(null));
        OffsetStrategy offsetStrategy = consumerAnnotation.enumValue("offsetStrategy", OffsetStrategy.class).orElse(OffsetStrategy.AUTO);
        AbstractKafkaConsumerConfiguration consumerConfigurationDefaults = this.getConsumerConfigurationDefaults((String)groupId);
        if (consumerAnnotation.isTrue("uniqueGroupId")) {
            groupId = (String)groupId + "_" + UUID.randomUUID();
        }
        DefaultKafkaConsumerConfiguration consumerConfiguration = new DefaultKafkaConsumerConfiguration(consumerConfigurationDefaults);
        Properties properties = this.createConsumerProperties((AnnotationValue<KafkaListener>)consumerAnnotation, consumerConfiguration, clientId, (String)groupId, offsetStrategy);
        this.configureDeserializers(method, consumerConfiguration);
        this.submitConsumerThreads(method, clientId, (String)groupId, offsetStrategy, topicAnnotations, (AnnotationValue<KafkaListener>)consumerAnnotation, consumerConfiguration, properties, beanType);
    }

    @Override
    @PreDestroy
    public void close() {
        this.consumers.values().forEach(ConsumerState::wakeUp);
        this.consumers.values().forEach(ConsumerState::close);
        this.consumers.clear();
    }

    void publishStartedPollingEvent(Consumer<?, ?> consumer) {
        this.kafkaConsumerStartedPollingEventPublisher.publishEvent((Object)new KafkaConsumerStartedPollingEvent(consumer));
    }

    void handleException(Object consumerBean, KafkaListenerException kafkaListenerException) {
        try {
            if (consumerBean instanceof KafkaListenerExceptionHandler) {
                KafkaListenerExceptionHandler kle = (KafkaListenerExceptionHandler)consumerBean;
                kle.handle((Throwable)((Object)kafkaListenerException));
            } else {
                this.exceptionHandler.handle((Throwable)((Object)kafkaListenerException));
            }
        }
        catch (Exception e) {
            e.addSuppressed((Throwable)((Object)kafkaListenerException));
            LOG.error("Unexpected error while handling the kafka listener exception", (Throwable)e);
        }
    }

    boolean shouldRetryMessage(Object consumerBean, KafkaListenerException kafkaListenerException) {
        ConditionalRetryBehaviourHandler kle;
        ConditionalRetryBehaviourHandler conditionalRetryBehaviourHandler = consumerBean instanceof ConditionalRetryBehaviourHandler ? (kle = (ConditionalRetryBehaviourHandler)consumerBean) : this.conditionalRetryBehaviourHandler;
        try {
            return conditionalRetryBehaviourHandler.conditionalRetryBehaviour(kafkaListenerException) == ConditionalRetryBehaviourHandler.ConditionalRetryBehaviour.RETRY;
        }
        catch (Exception e) {
            e.addSuppressed((Throwable)((Object)kafkaListenerException));
            LOG.error("Unexpected error while determining how to handle the kafka listener exception", (Throwable)e);
            return false;
        }
    }

    void scheduleTask(Duration delay, Runnable command) {
        this.taskScheduler.schedule(delay, command);
    }

    <K, V> Producer<K, V> getProducer(String id, Class<K> keyType, Class<V> valueType) {
        return this.producerRegistry.getProducer(id, Argument.of(keyType), Argument.of(valueType));
    }

    <K, V> Producer<K, V> getTransactionalProducer(@Nullable String clientId, @Nullable String transactionalId, Class<K> keyClass, Class<V> valueClass) {
        return this.transactionalProducerRegistry.getTransactionalProducer(clientId, transactionalId, Argument.of(keyClass), Argument.of(valueClass));
    }

    void handleProducerFencedException(Producer<?, ?> producer, ProducerFencedException e) {
        LOG.error("Failed accessing the producer: {}", producer, (Object)e);
        this.transactionalProducerRegistry.close(producer);
    }

    <T> Flux<T> convertPublisher(T result) {
        return Flux.from((Publisher)((Publisher)Publishers.convertPublisher((ConversionService)this.beanContext.getConversionService(), result, Publisher.class)));
    }

    ConsumerRecordBinderRegistry getBinderRegistry() {
        return this.binderRegistry;
    }

    BatchConsumerRecordsBinderRegistry getBatchBinderRegistry() {
        return this.batchBinderRegistry;
    }

    private AbstractKafkaConsumerConfiguration getConsumerConfigurationDefaults(String groupId) {
        return this.findConfigurationBean(groupId).or(() -> this.findHyphenatedConsumerConfigurationBean(groupId)).orElse(this.defaultConsumerConfiguration);
    }

    private Optional<AbstractKafkaConsumerConfiguration> findConfigurationBean(String groupId) {
        return this.beanContext.findBean(AbstractKafkaConsumerConfiguration.class, Qualifiers.byName((String)groupId));
    }

    private Optional<AbstractKafkaConsumerConfiguration> findHyphenatedConsumerConfigurationBean(String groupId) {
        if (NameUtils.isValidHyphenatedPropertyName((String)groupId)) {
            return Optional.empty();
        }
        return this.findConfigurationBean(NameUtils.hyphenate((String)groupId));
    }

    private Properties createConsumerProperties(AnnotationValue<KafkaListener> consumerAnnotation, DefaultKafkaConsumerConfiguration consumerConfiguration, String clientId, String groupId, OffsetStrategy offsetStrategy) {
        Properties properties = consumerConfiguration.getConfig();
        if (consumerAnnotation.getRequiredValue("offsetReset", OffsetReset.class) == OffsetReset.EARLIEST) {
            properties.putIfAbsent("auto.offset.reset", OffsetReset.EARLIEST.name().toLowerCase());
        }
        properties.putIfAbsent("enable.auto.commit", String.valueOf(offsetStrategy == OffsetStrategy.AUTO));
        consumerAnnotation.get((CharSequence)"heartbeatInterval", Duration.class).map(Duration::toMillis).map(String::valueOf).ifPresent(heartbeatInterval -> properties.putIfAbsent("heartbeat.interval.ms", heartbeatInterval));
        consumerAnnotation.get((CharSequence)"sessionTimeout", Duration.class).map(Duration::toMillis).map(String::valueOf).ifPresent(sessionTimeout -> properties.putIfAbsent("session.timeout.ms", sessionTimeout));
        consumerAnnotation.enumValue("isolation", IsolationLevel.class).ifPresent(isolation -> properties.putIfAbsent("isolation.level", isolation.toString().toLowerCase(Locale.ROOT)));
        properties.put("group.id", groupId);
        if (clientId != null) {
            properties.put("client.id", clientId);
        }
        properties.putAll((Map<?, ?>)consumerAnnotation.getProperties("properties", "name"));
        return properties;
    }

    private void debugDeserializationConfiguration(ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration<?, ?> consumerConfiguration) {
        if (!LOG.isDebugEnabled()) {
            return;
        }
        Properties properties = consumerConfiguration.getConfig();
        String logMethod = KafkaConsumerProcessor.logMethod(method);
        String keyDeserializerClass = consumerConfiguration.getKeyDeserializer().map(Object::toString).orElseGet(() -> properties.getProperty("key.deserializer"));
        String valueDeserializerClass = consumerConfiguration.getValueDeserializer().map(Object::toString).orElseGet(() -> properties.getProperty("value.deserializer"));
        LOG.debug("Using key deserializer [{}] for Kafka listener: {}", (Object)keyDeserializerClass, (Object)logMethod);
        LOG.debug("Using value deserializer [{}] for Kafka listener: {}", (Object)valueDeserializerClass, (Object)logMethod);
    }

    private void submitConsumerThreads(ExecutableMethod<?, ?> method, String clientId, String groupId, OffsetStrategy offsetStrategy, List<AnnotationValue<Topic>> topicAnnotations, AnnotationValue<KafkaListener> consumerAnnotation, DefaultKafkaConsumerConfiguration<?, ?> consumerConfiguration, Properties properties, Class<?> beanType) {
        int consumerThreads = consumerAnnotation.intValue("threads").orElse(1);
        for (int i = 0; i < consumerThreads; ++i) {
            Object finalClientId;
            if (clientId != null) {
                finalClientId = consumerThreads > 1 || this.consumers.containsKey(clientId) ? clientId + "-" + this.clientIdGenerator.incrementAndGet() : clientId;
                properties.put("client.id", finalClientId);
            } else {
                finalClientId = "kafka-consumer-" + this.clientIdGenerator.incrementAndGet();
            }
            Consumer kafkaConsumer = (Consumer)this.beanContext.createBean(Consumer.class, new Object[]{consumerConfiguration});
            Object consumerBean = this.beanContext.getBean(beanType);
            if (consumerBean instanceof ConsumerAware) {
                ConsumerAware ca = (ConsumerAware)consumerBean;
                ca.setKafkaConsumer(kafkaConsumer);
            }
            topicAnnotations.forEach(a -> KafkaConsumerProcessor.setupConsumerSubscription(method, (AnnotationValue<Topic>)a, consumerBean, kafkaConsumer));
            this.kafkaConsumerSubscribedEventPublisher.publishEvent((Object)new KafkaConsumerSubscribedEvent(kafkaConsumer));
            ConsumerInfo consumerInfo = new ConsumerInfo((String)finalClientId, groupId, offsetStrategy, consumerAnnotation, method);
            ConsumerState consumerState = consumerInfo.isBatch ? new ConsumerStateBatch(this, consumerInfo, kafkaConsumer, consumerBean) : new ConsumerStateSingle(this, consumerInfo, kafkaConsumer, consumerBean);
            this.consumers.put((String)finalClientId, consumerState);
            this.executorService.submit(consumerState::threadPollLoop);
        }
    }

    private static void setupConsumerSubscription(ExecutableMethod<?, ?> method, AnnotationValue<Topic> topicAnnotation, Object consumerBean, Consumer<?, ?> kafkaConsumer) {
        String logMethod;
        Object[] topicNames = topicAnnotation.stringValues();
        Object[] patterns = topicAnnotation.stringValues("patterns");
        boolean hasTopics = ArrayUtils.isNotEmpty((Object[])topicNames);
        boolean hasPatterns = ArrayUtils.isNotEmpty((Object[])patterns);
        String string = logMethod = LOG.isInfoEnabled() ? KafkaConsumerProcessor.logMethod(method) : null;
        if (!hasTopics && !hasPatterns) {
            throw new MessagingSystemException("Either a topic or a topic must be specified for method: " + method);
        }
        Optional<ConsumerRebalanceListener> listener = KafkaConsumerProcessor.getConsumerRebalanceListener(consumerBean, kafkaConsumer);
        if (hasTopics) {
            Object[] topics = Arrays.asList(topicNames);
            listener.ifPresentOrElse(arg_0 -> KafkaConsumerProcessor.lambda$setupConsumerSubscription$11(kafkaConsumer, (List)topics, arg_0), () -> KafkaConsumerProcessor.lambda$setupConsumerSubscription$12(kafkaConsumer, (List)topics));
            LOG.info("Kafka listener [{}] subscribed to topics: {}", (Object)logMethod, (Object)topics);
        }
        if (hasPatterns) {
            try {
                for (Object pattern : patterns) {
                    Pattern compiledPattern = Pattern.compile((String)pattern);
                    listener.ifPresentOrElse(l -> kafkaConsumer.subscribe(compiledPattern, l), () -> kafkaConsumer.subscribe(compiledPattern));
                    LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", (Object)logMethod, pattern);
                }
            }
            catch (PatternSyntaxException e) {
                throw new MessagingSystemException("Invalid topic pattern [" + e.getPattern() + "] for method [" + method + "]: " + e.getMessage(), (Throwable)e);
            }
        }
    }

    private static Optional<ConsumerRebalanceListener> getConsumerRebalanceListener(Object consumerBean, Consumer<?, ?> kafkaConsumer) {
        if (consumerBean instanceof ConsumerSeekAware) {
            ConsumerSeekAware csa = (ConsumerSeekAware)consumerBean;
            return Optional.of(new ConsumerSeekAwareAdapter(KafkaSeeker.newInstance(kafkaConsumer), csa));
        }
        if (consumerBean instanceof ConsumerRebalanceListener) {
            ConsumerRebalanceListener crl = (ConsumerRebalanceListener)consumerBean;
            return Optional.of(crl);
        }
        return Optional.empty();
    }

    private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {
        return Arrays.stream(method.getArguments()).filter(arg -> KafkaConsumerProcessor.isConsumerRecord(arg) || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)).findFirst().orElseGet(() -> Arrays.stream(method.getArguments()).filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class) && !KafkaConsumerProcessor.isLastArgumentOfSuspendedMethod(arg, method)).findFirst().orElse(null));
    }

    private static Argument<?> findBodyArgument(boolean batch, ExecutableMethod<?, ?> method) {
        Argument<?> tempBodyArg = KafkaConsumerProcessor.findBodyArgument(method);
        if (batch && tempBodyArg != null) {
            return KafkaConsumerProcessor.isConsumerRecord(tempBodyArg) ? tempBodyArg : KafkaConsumerProcessor.getComponentType(tempBodyArg);
        }
        return tempBodyArg;
    }

    private static boolean isLastArgumentOfSuspendedMethod(Argument<?> argument, ExecutableMethod<?, ?> method) {
        if (!method.isSuspend()) {
            return false;
        }
        Argument lastArgumentValue = method.getArguments()[method.getArguments().length - 1];
        return argument.equals((Object)lastArgumentValue);
    }

    private void configureDeserializers(ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration<?, ?> config) {
        boolean batch = method.isTrue(KafkaListener.class, "batch");
        Argument<?> bodyArgument = KafkaConsumerProcessor.findBodyArgument(batch, method);
        this.configureKeyDeserializer(bodyArgument, method, config);
        this.configureValueDeserializer(bodyArgument, config);
        this.debugDeserializationConfiguration(method, config);
    }

    private void configureKeyDeserializer(Argument<?> bodyArgument, ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration config) {
        if (!config.getConfig().containsKey("key.deserializer") && config.getKeyDeserializer().isEmpty()) {
            Arrays.stream(method.getArguments()).filter(arg -> arg.isAnnotationPresent(KafkaKey.class)).findFirst().or(() -> Optional.ofNullable(bodyArgument).filter(KafkaConsumerProcessor::isConsumerRecord).flatMap(b -> b.getTypeVariable("K"))).map(this.serdeRegistry::pickDeserializer).ifPresentOrElse(config::setKeyDeserializer, () -> config.setKeyDeserializer(DEFAULT_KEY_DESERIALIZER));
        }
    }

    private void configureValueDeserializer(Argument<?> bodyArgument, DefaultKafkaConsumerConfiguration config) {
        if (!config.getConfig().containsKey("value.deserializer") && config.getValueDeserializer().isEmpty()) {
            Optional<Argument<?>> body = Optional.ofNullable(bodyArgument);
            body.filter(KafkaConsumerProcessor::isConsumerRecord).flatMap(b -> b.getTypeVariable("V")).or(() -> body).map(this.serdeRegistry::pickDeserializer).ifPresentOrElse(config::setValueDeserializer, () -> config.setValueDeserializer(DEFAULT_VALUE_DESERIALIZER));
        }
    }

    private static boolean isConsumerRecord(@NonNull Argument<?> body) {
        return ConsumerRecord.class.isAssignableFrom(body.getType()) || ConsumerRecords.class.isAssignableFrom(body.getType());
    }

    private static Argument<?> getComponentType(Argument<?> argument) {
        Class argumentType = argument.getType();
        return argumentType.isArray() ? Argument.of(argumentType.getComponentType()) : argument.getFirstTypeVariable().orElse(argument);
    }

    private static String logMethod(ExecutableMethod<?, ?> method) {
        return method.getDeclaringType().getSimpleName() + "#" + method.getName();
    }

    private static /* synthetic */ void lambda$setupConsumerSubscription$12(Consumer kafkaConsumer, List topics) {
        kafkaConsumer.subscribe((Collection)topics);
    }

    private static /* synthetic */ void lambda$setupConsumerSubscription$11(Consumer kafkaConsumer, List topics, ConsumerRebalanceListener l) {
        kafkaConsumer.subscribe((Collection)topics, l);
    }
}

