/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.ConsumerRegistry;
import io.micronaut.configuration.kafka.ProducerRegistry;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.bind.ConsumerRecordBinderRegistry;
import io.micronaut.configuration.kafka.bind.batch.BatchConsumerRecordsBinderRegistry;
import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Blocking;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.ArrayUtils;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.SendTo;
import io.micronaut.messaging.exceptions.MessagingSystemException;
import io.micronaut.runtime.ApplicationConfiguration;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.lang.invoke.LambdaMetafactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Singleton
@Requires(beans={KafkaDefaultConfiguration.class})
public class KafkaConsumerProcessor
implements ExecutableMethodProcessor<Topic>,
AutoCloseable,
ConsumerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
    private final ExecutorService executorService;
    private final ApplicationConfiguration applicationConfiguration;
    private final BeanContext beanContext;
    private final AbstractKafkaConsumerConfiguration defaultConsumerConfiguration;
    private final Map<String, Consumer> consumers = new ConcurrentHashMap<String, Consumer>();
    private final Map<String, Set<String>> consumerSubscriptions = new ConcurrentHashMap<String, Set<String>>();
    private final Map<String, Set<TopicPartition>> consumerAssignments = new ConcurrentHashMap<String, Set<TopicPartition>>();
    private final Map<String, Consumer> pausedConsumers = new ConcurrentHashMap<String, Consumer>();
    private final Set<String> paused = new ConcurrentSkipListSet<String>();
    private final ConsumerRecordBinderRegistry binderRegistry;
    private final SerdeRegistry serdeRegistry;
    private final Scheduler executorScheduler;
    private final KafkaListenerExceptionHandler exceptionHandler;
    private final ProducerRegistry producerRegistry;
    private final BatchConsumerRecordsBinderRegistry batchBinderRegistry;
    private final AtomicInteger clientIdGenerator = new AtomicInteger(10);

    public KafkaConsumerProcessor(@Named(value="consumer") ExecutorService executorService, ApplicationConfiguration applicationConfiguration, BeanContext beanContext, AbstractKafkaConsumerConfiguration defaultConsumerConfiguration, ConsumerRecordBinderRegistry binderRegistry, BatchConsumerRecordsBinderRegistry batchBinderRegistry, SerdeRegistry serdeRegistry, ProducerRegistry producerRegistry, KafkaListenerExceptionHandler exceptionHandler) {
        this.executorService = executorService;
        this.applicationConfiguration = applicationConfiguration;
        this.beanContext = beanContext;
        this.defaultConsumerConfiguration = defaultConsumerConfiguration;
        this.binderRegistry = binderRegistry;
        this.batchBinderRegistry = batchBinderRegistry;
        this.serdeRegistry = serdeRegistry;
        this.executorScheduler = Schedulers.fromExecutor((Executor)executorService);
        this.producerRegistry = producerRegistry;
        this.exceptionHandler = exceptionHandler;
        this.beanContext.getBeanDefinitions(Qualifiers.byType((Class[])new Class[]{KafkaListener.class})).forEach(definition -> {
            if (definition.isSingleton()) {
                try {
                    beanContext.getBean(definition.getBeanType());
                }
                catch (Exception e) {
                    throw new MessagingSystemException("Error creating bean for @KafkaListener of type [" + definition.getBeanType() + "]: " + e.getMessage(), (Throwable)e);
                }
            }
        });
    }

    @Override
    @NonNull
    public <K, V> Consumer<K, V> getConsumer(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Consumer consumer = this.consumers.get(id);
        if (consumer == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        return consumer;
    }

    @Override
    @NonNull
    public Set<String> getConsumerSubscription(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Set<String> subscriptions = this.consumerSubscriptions.get(id);
        if (subscriptions == null || subscriptions.isEmpty()) {
            throw new IllegalArgumentException("No consumer subscription found for ID: " + id);
        }
        return subscriptions;
    }

    @Override
    @NonNull
    public Set<TopicPartition> getConsumerAssignment(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Set<TopicPartition> assignment = this.consumerAssignments.get(id);
        if (assignment == null || assignment.isEmpty()) {
            throw new IllegalArgumentException("No consumer assignment found for ID: " + id);
        }
        return assignment;
    }

    @Override
    @NonNull
    public Set<String> getConsumerIds() {
        return Collections.unmodifiableSet(this.consumers.keySet());
    }

    @Override
    public boolean isPaused(@NonNull String id) {
        if (StringUtils.isNotEmpty((CharSequence)id) && this.consumers.containsKey(id)) {
            return this.paused.contains(id) && this.pausedConsumers.containsKey(id);
        }
        return false;
    }

    @Override
    public void pause(@NonNull String id) {
        if (!StringUtils.isNotEmpty((CharSequence)id) || !this.consumers.containsKey(id)) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        this.paused.add(id);
    }

    @Override
    public void resume(@NonNull String id) {
        if (!StringUtils.isNotEmpty((CharSequence)id) || !this.consumers.containsKey(id)) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        this.paused.remove(id);
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        List topicAnnotations = method.getDeclaredAnnotationValuesByType(Topic.class);
        AnnotationValue consumerAnnotation = method.getAnnotation(KafkaListener.class);
        if (CollectionUtils.isEmpty((Collection)topicAnnotations)) {
            topicAnnotations = beanDefinition.getDeclaredAnnotationValuesByType(Topic.class);
        }
        if (consumerAnnotation == null || CollectionUtils.isEmpty((Collection)topicAnnotations)) {
            return;
        }
        Class beanType = beanDefinition.getBeanType();
        String groupId = consumerAnnotation.stringValue("groupId").filter(StringUtils::isNotEmpty).orElseGet(() -> this.applicationConfiguration.getName().orElse(beanType.getName()));
        String clientId = consumerAnnotation.stringValue("clientId").filter(StringUtils::isNotEmpty).orElseGet(() -> this.applicationConfiguration.getName().map(s -> s + '-' + NameUtils.hyphenate((String)beanType.getSimpleName())).orElse(null));
        OffsetStrategy offsetStrategy = consumerAnnotation.enumValue("offsetStrategy", OffsetStrategy.class).orElse(OffsetStrategy.AUTO);
        AbstractKafkaConsumerConfiguration consumerConfigurationDefaults = this.beanContext.findBean(AbstractKafkaConsumerConfiguration.class, Qualifiers.byName((String)groupId)).orElse(this.defaultConsumerConfiguration);
        DefaultKafkaConsumerConfiguration consumerConfiguration = new DefaultKafkaConsumerConfiguration(consumerConfigurationDefaults);
        Properties properties = this.createConsumerProperties(method, (AnnotationValue<KafkaListener>)consumerAnnotation, consumerConfiguration, clientId, groupId, offsetStrategy);
        this.configureDeserializers(method, consumerConfiguration);
        this.submitConsumerThreads(method, clientId, offsetStrategy, topicAnnotations, (AnnotationValue<KafkaListener>)consumerAnnotation, consumerConfiguration, properties, beanType);
    }

    @Override
    @PreDestroy
    public void close() {
        for (Consumer consumer : this.consumers.values()) {
            consumer.wakeup();
        }
        this.consumers.clear();
    }

    private Properties createConsumerProperties(ExecutableMethod<?, ?> method, AnnotationValue<KafkaListener> consumerAnnotation, DefaultKafkaConsumerConfiguration consumerConfiguration, String clientId, String groupId, OffsetStrategy offsetStrategy) {
        Properties properties = consumerConfiguration.getConfig();
        if (consumerAnnotation.getRequiredValue("offsetReset", OffsetReset.class) == OffsetReset.EARLIEST) {
            properties.putIfAbsent("auto.offset.reset", OffsetReset.EARLIEST.name().toLowerCase());
        }
        properties.putIfAbsent("enable.auto.commit", String.valueOf(offsetStrategy == OffsetStrategy.AUTO));
        method.getValue(KafkaListener.class, "heartbeatInterval", Duration.class).map(Duration::toMillis).map(String::valueOf).ifPresent(heartbeatInterval -> properties.putIfAbsent("heartbeat.interval.ms", heartbeatInterval));
        method.getValue(KafkaListener.class, "sessionTimeout", Duration.class).map(Duration::toMillis).map(String::valueOf).ifPresent(sessionTimeout -> properties.putIfAbsent("session.timeout.ms", sessionTimeout));
        if (consumerAnnotation.isTrue("uniqueGroupId")) {
            String uniqueGroupId = groupId + "_" + UUID.randomUUID().toString();
            properties.put("group.id", uniqueGroupId);
        } else {
            properties.put("group.id", groupId);
        }
        if (clientId != null) {
            properties.put("client.id", clientId);
        }
        properties.putAll((Map<?, ?>)consumerAnnotation.getProperties("properties", "name"));
        return properties;
    }

    private void debugDeserializationConfiguration(ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration<?, ?> consumerConfiguration, Properties properties) {
        if (!LOG.isDebugEnabled()) {
            return;
        }
        Optional keyDeserializer = consumerConfiguration.getKeyDeserializer();
        if (consumerConfiguration.getKeyDeserializer().isPresent()) {
            LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), method);
        } else {
            LOG.debug("Using key deserializer [{}] for Kafka listener: {}", (Object)properties.getProperty("key.deserializer"), method);
        }
        Optional valueDeserializer = consumerConfiguration.getValueDeserializer();
        if (valueDeserializer.isPresent()) {
            LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), method);
        } else {
            LOG.debug("Using value deserializer [{}] for Kafka listener: {}", (Object)properties.getProperty("value.deserializer"), method);
        }
    }

    private void submitConsumerThreads(ExecutableMethod<?, ?> method, String clientId, OffsetStrategy offsetStrategy, List<AnnotationValue<Topic>> topicAnnotations, AnnotationValue<KafkaListener> consumerAnnotation, DefaultKafkaConsumerConfiguration<?, ?> consumerConfiguration, Properties properties, Class<?> beanType) {
        int consumerThreads = consumerAnnotation.intValue("threads").orElse(1);
        for (int i = 0; i < consumerThreads; ++i) {
            String finalClientId;
            if (clientId != null) {
                finalClientId = consumerThreads > 1 ? clientId + '-' + this.clientIdGenerator.incrementAndGet() : clientId;
                properties.put("client.id", finalClientId);
            } else {
                finalClientId = "kafka-consumer-" + this.clientIdGenerator.incrementAndGet();
            }
            this.submitConsumerThread(method, finalClientId, offsetStrategy, topicAnnotations, consumerAnnotation, consumerConfiguration, beanType);
        }
    }

    private void submitConsumerThread(ExecutableMethod<?, ?> method, String finalClientId, OffsetStrategy offsetStrategy, List<AnnotationValue<Topic>> topicAnnotations, AnnotationValue<KafkaListener> consumerAnnotation, DefaultKafkaConsumerConfiguration<?, ?> consumerConfiguration, Class<?> beanType) {
        Consumer kafkaConsumer = (Consumer)this.beanContext.createBean(Consumer.class, new Object[]{consumerConfiguration});
        this.consumers.put(finalClientId, kafkaConsumer);
        Object consumerBean = this.beanContext.getBean(beanType);
        if (consumerBean instanceof ConsumerAware) {
            ((ConsumerAware)consumerBean).setKafkaConsumer(kafkaConsumer);
        }
        KafkaConsumerProcessor.setupConsumerSubscription(method, topicAnnotations, consumerBean, kafkaConsumer);
        this.consumerSubscriptions.put(finalClientId, Collections.unmodifiableSet(kafkaConsumer.subscription()));
        this.executorService.submit(() -> this.createConsumerThreadPollLoop(method, finalClientId, offsetStrategy, consumerAnnotation, consumerBean, kafkaConsumer));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private void createConsumerThreadPollLoop(ExecutableMethod<?, ?> method, String finalClientId, OffsetStrategy offsetStrategy, AnnotationValue<KafkaListener> consumerAnnotation, Object consumerBean, Consumer<?, ?> kafkaConsumer) {
        block27: {
            isBatch = method.isTrue(KafkaListener.class, "batch");
            pollTimeout = method.getValue(KafkaListener.class, "pollTimeout", Duration.class).orElseGet((Supplier<Duration>)LambdaMetafactory.metafactory(null, null, null, ()Ljava/lang/Object;, lambda$createConsumerThreadPollLoop$7(), ()Ljava/time/Duration;)());
            consumerArg = Arrays.stream(method.getArguments()).filter((Predicate<Argument>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$createConsumerThreadPollLoop$8(io.micronaut.core.type.Argument ), (Lio/micronaut/core/type/Argument;)Z)()).findFirst();
            ackArg = Arrays.stream(method.getArguments()).filter((Predicate<Argument>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$createConsumerThreadPollLoop$9(io.micronaut.core.type.Argument ), (Lio/micronaut/core/type/Argument;)Z)()).findFirst();
            try {
                try {
                    trackPartitions = ackArg.isPresent() != false || offsetStrategy == OffsetStrategy.SYNC_PER_RECORD || offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD;
                    boundArguments = new HashMap<Argument<?>, Object>(2);
                    consumerArg.ifPresent((java.util.function.Consumer<Argument>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$createConsumerThreadPollLoop$10(java.util.Map org.apache.kafka.clients.consumer.Consumer io.micronaut.core.type.Argument ), (Lio/micronaut/core/type/Argument;)V)(boundArguments, kafkaConsumer));
                    consumerPaused = false;
                    while (true) lbl-1000:
                    // 8 sources

                    {
                        this.consumerAssignments.put(finalClientId, Collections.unmodifiableSet(kafkaConsumer.assignment()));
                        try {
                            if (!consumerPaused && this.paused.contains(finalClientId)) {
                                consumerPaused = true;
                                KafkaConsumerProcessor.LOG.debug("Pausing Kafka consumption for Consumer [{}] from topic partition: {}", (Object)finalClientId, (Object)kafkaConsumer.paused());
                                kafkaConsumer.pause((Collection)kafkaConsumer.assignment());
                                this.pausedConsumers.put(finalClientId, kafkaConsumer);
                            }
                            consumerRecords = kafkaConsumer.poll(pollTimeout);
                            if (consumerPaused && !this.paused.contains(finalClientId)) {
                                KafkaConsumerProcessor.LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", (Object)finalClientId, (Object)kafkaConsumer.paused());
                                kafkaConsumer.resume((Collection)kafkaConsumer.paused());
                                this.pausedConsumers.remove(finalClientId);
                                consumerPaused = false;
                            }
                            if (consumerRecords == null || consumerRecords.count() <= 0) ** GOTO lbl-1000
                            if (isBatch) {
                                failed = this.processConsumerRecordsAsBatch(method, consumerBean, kafkaConsumer, consumerAnnotation, boundArguments, consumerRecords) == false;
                            } else {
                                v0 = failed = this.processConsumerRecords(method, offsetStrategy, consumerBean, kafkaConsumer, consumerAnnotation, boundArguments, trackPartitions, ackArg, consumerRecords) == false;
                            }
                            if (failed) ** GOTO lbl-1000
                            if (offsetStrategy == OffsetStrategy.SYNC) {
                                try {
                                    kafkaConsumer.commitSync();
                                }
                                catch (CommitFailedException e) {
                                    this.handleException(kafkaConsumer, consumerBean, null, e);
                                }
                            }
                            if (offsetStrategy != OffsetStrategy.ASYNC) ** GOTO lbl-1000
                            kafkaConsumer.commitAsync(KafkaConsumerProcessor.resolveCommitCallback(consumerBean));
                        }
                        catch (WakeupException e) {
                            throw e;
                        }
                        catch (Throwable e) {
                            this.handleException(kafkaConsumer, consumerBean, null, e);
                            continue;
                        }
                        break;
                    }
                }
                catch (WakeupException trackPartitions) {
                    try {
                        if (offsetStrategy != OffsetStrategy.DISABLED) {
                            kafkaConsumer.commitSync();
                        }
                        break block27;
                    }
                    catch (Throwable e) {
                        KafkaConsumerProcessor.LOG.warn("Error committing Kafka offsets on shutdown: {}", (Object)e.getMessage(), (Object)e);
                        break block27;
                    }
                    finally {
                        kafkaConsumer.close();
                    }
                }
                ** GOTO lbl-1000
            }
            catch (Throwable var18_22) {
                try {
                    if (offsetStrategy != OffsetStrategy.DISABLED) {
                        kafkaConsumer.commitSync();
                    }
                }
                catch (Throwable e) {
                    KafkaConsumerProcessor.LOG.warn("Error committing Kafka offsets on shutdown: {}", (Object)e.getMessage(), (Object)e);
                }
                finally {
                    kafkaConsumer.close();
                }
                throw var18_22;
            }
        }
    }

    private boolean processConsumerRecords(ExecutableMethod<?, ?> method, OffsetStrategy offsetStrategy, Object consumerBean, Consumer<?, ?> kafkaConsumer, AnnotationValue<KafkaListener> consumerAnnotation, Map<Argument<?>, Object> boundArguments, boolean trackPartitions, Optional<Argument<?>> ackArg, ConsumerRecords<?, ?> consumerRecords) {
        DefaultExecutableBinder executableBinder = new DefaultExecutableBinder(boundArguments);
        HashMap<TopicPartition, OffsetAndMetadata> currentOffsets = trackPartitions ? new HashMap<TopicPartition, OffsetAndMetadata>() : null;
        for (ConsumerRecord consumerRecord : consumerRecords) {
            LOG.trace("Kafka consumer [{}] received record: {}", method, (Object)consumerRecord);
            if (trackPartitions) {
                TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1L, null);
                currentOffsets.put(topicPartition, offsetAndMetadata);
            }
            ackArg.ifPresent(argument -> boundArguments.put((Argument<?>)argument, () -> kafkaConsumer.commitSync(currentOffsets)));
            try {
                BoundExecutable boundExecutable = executableBinder.bind(method, (ArgumentBinderRegistry)this.binderRegistry, (Object)consumerRecord);
                Object result = boundExecutable.invoke(consumerBean);
                if (result != null) {
                    boolean isBlocking;
                    Flux resultFlowable;
                    if (Publishers.isConvertibleToPublisher((Object)result)) {
                        resultFlowable = Flux.from((Publisher)((Publisher)Publishers.convertPublisher((Object)result, Publisher.class)));
                        isBlocking = method.hasAnnotation(Blocking.class);
                    } else {
                        resultFlowable = Flux.just((Object)result);
                        isBlocking = true;
                    }
                    this.handleResultFlux(consumerAnnotation, consumerBean, method, kafkaConsumer, consumerRecord, resultFlowable, isBlocking);
                }
            }
            catch (Throwable e) {
                this.handleException(kafkaConsumer, consumerBean, consumerRecord, e);
                return false;
            }
            if (offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) {
                try {
                    kafkaConsumer.commitSync(currentOffsets);
                }
                catch (CommitFailedException e) {
                    this.handleException(kafkaConsumer, consumerBean, consumerRecord, e);
                }
                continue;
            }
            if (offsetStrategy != OffsetStrategy.ASYNC_PER_RECORD) continue;
            kafkaConsumer.commitAsync(currentOffsets, KafkaConsumerProcessor.resolveCommitCallback(consumerBean));
        }
        return true;
    }

    private boolean processConsumerRecordsAsBatch(ExecutableMethod<?, ?> method, Object consumerBean, Consumer<?, ?> kafkaConsumer, AnnotationValue<KafkaListener> consumerAnnotation, Map<Argument<?>, Object> boundArguments, ConsumerRecords<?, ?> consumerRecords) {
        DefaultExecutableBinder batchBinder = new DefaultExecutableBinder(boundArguments);
        BoundExecutable boundExecutable = batchBinder.bind(method, (ArgumentBinderRegistry)this.batchBinderRegistry, consumerRecords);
        List<Object> result = boundExecutable.invoke(consumerBean);
        if (result != null) {
            boolean isBlocking;
            if (result.getClass().isArray()) {
                result = Arrays.asList((Object[])result);
            }
            boolean isPublisher = Publishers.isConvertibleToPublisher((Object)result);
            Flux resultFlux = result instanceof Iterable ? Flux.fromIterable((Iterable)result) : (isPublisher ? Flux.from((Publisher)((Publisher)Publishers.convertPublisher(result, Publisher.class))) : Flux.just(result));
            Iterator iterator = consumerRecords.iterator();
            boolean bl = isBlocking = !isPublisher || method.hasAnnotation(Blocking.class);
            if (isBlocking) {
                List objects = (List)resultFlux.collectList().block();
                for (Object object : objects) {
                    if (!iterator.hasNext()) continue;
                    ConsumerRecord consumerRecord = (ConsumerRecord)iterator.next();
                    this.handleResultFlux(consumerAnnotation, consumerBean, method, kafkaConsumer, consumerRecord, Flux.just(object), isBlocking);
                }
            } else {
                resultFlux.subscribe(o -> {
                    if (iterator.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord)iterator.next();
                        this.handleResultFlux(consumerAnnotation, consumerBean, method, kafkaConsumer, consumerRecord, Flux.just((Object)o), isBlocking);
                    }
                });
            }
        }
        return true;
    }

    private static void setupConsumerSubscription(ExecutableMethod<?, ?> method, List<AnnotationValue<Topic>> topicAnnotations, Object consumerBean, Consumer<?, ?> kafkaConsumer) {
        for (AnnotationValue<Topic> topicAnnotation : topicAnnotations) {
            Object[] topicNames = topicAnnotation.stringValues();
            Object[] patterns = topicAnnotation.stringValues("patterns");
            boolean hasTopics = ArrayUtils.isNotEmpty((Object[])topicNames);
            boolean hasPatterns = ArrayUtils.isNotEmpty((Object[])patterns);
            if (!hasTopics && !hasPatterns) {
                throw new MessagingSystemException("Either a topic or a topic must be specified for method: " + method);
            }
            if (hasTopics) {
                List<Object> topics = Arrays.asList(topicNames);
                if (consumerBean instanceof ConsumerRebalanceListener) {
                    kafkaConsumer.subscribe(topics, (ConsumerRebalanceListener)consumerBean);
                } else {
                    kafkaConsumer.subscribe(topics);
                }
                LOG.info("Kafka listener [{}] subscribed to topics: {}", method, topics);
            }
            if (!hasPatterns) continue;
            for (Object pattern : patterns) {
                Pattern compiledPattern;
                try {
                    compiledPattern = Pattern.compile((String)pattern);
                }
                catch (Exception e) {
                    throw new MessagingSystemException("Invalid topic pattern [" + (String)pattern + "] for method [" + method + "]: " + e.getMessage(), (Throwable)e);
                }
                if (consumerBean instanceof ConsumerRebalanceListener) {
                    kafkaConsumer.subscribe(compiledPattern, (ConsumerRebalanceListener)consumerBean);
                } else {
                    kafkaConsumer.subscribe(compiledPattern);
                }
                LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", method, pattern);
            }
        }
    }

    private void handleException(Consumer<?, ?> kafkaConsumer, Object consumerBean, ConsumerRecord<?, ?> consumerRecord, Throwable e) {
        KafkaListenerException kafkaListenerException = new KafkaListenerException(e, consumerBean, kafkaConsumer, consumerRecord);
        this.handleException(consumerBean, kafkaListenerException);
    }

    private void handleException(Object consumerBean, KafkaListenerException kafkaListenerException) {
        if (consumerBean instanceof KafkaListenerExceptionHandler) {
            ((KafkaListenerExceptionHandler)consumerBean).handle((Throwable)((Object)kafkaListenerException));
        } else {
            this.exceptionHandler.handle((Throwable)((Object)kafkaListenerException));
        }
    }

    private void handleResultFlux(AnnotationValue<KafkaListener> kafkaListener, Object consumerBean, ExecutableMethod<?, ?> method, Consumer kafkaConsumer, ConsumerRecord<?, ?> consumerRecord, Flux<?> resultFlowable, boolean isBlocking) {
        Flux recordMetadataProducer = resultFlowable.subscribeOn(this.executorScheduler).flatMap(o -> {
            Object[] destinationTopics = method.stringValues(SendTo.class);
            if (ArrayUtils.isNotEmpty((Object[])destinationTopics)) {
                Object key = consumerRecord.key();
                Object value = o;
                if (value != null) {
                    String groupId = kafkaListener.stringValue("groupId").orElse(null);
                    Producer kafkaProducer = this.producerRegistry.getProducer(StringUtils.isNotEmpty((CharSequence)groupId) ? groupId : null, Argument.of(key != null ? key.getClass() : byte[].class), Argument.of(value.getClass()));
                    return Flux.create(arg_0 -> KafkaConsumerProcessor.lambda$null$15((String[])destinationTopics, key, value, consumerRecord, kafkaProducer, arg_0), (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.ERROR);
                }
                return Flux.empty();
            }
            return Flux.empty();
        }).onErrorResume(throwable -> {
            this.handleException(consumerBean, new KafkaListenerException("Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), (Throwable)throwable, consumerBean, (Consumer<?, ?>)kafkaConsumer, consumerRecord));
            if (kafkaListener.isTrue("redelivery")) {
                LOG.debug("Attempting redelivery of record [{}] following error", (Object)consumerRecord);
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                if (key != null && value != null) {
                    String groupId = kafkaListener.stringValue("groupId").orElse(null);
                    Producer kafkaProducer = this.producerRegistry.getProducer(StringUtils.isNotEmpty((CharSequence)groupId) ? groupId : null, Argument.of(key.getClass()), Argument.of(value.getClass()));
                    ProducerRecord record = new ProducerRecord(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), key, value, (Iterable)consumerRecord.headers());
                    return Flux.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> {
                        if (exception != null) {
                            this.handleException(consumerBean, new KafkaListenerException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), (Throwable)throwable, consumerBean, (Consumer<?, ?>)kafkaConsumer, consumerRecord));
                            emitter.complete();
                        } else {
                            emitter.next((Object)metadata);
                            emitter.complete();
                        }
                    }), (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.ERROR);
                }
            }
            return Flux.empty();
        });
        if (isBlocking) {
            RecordMetadata recordMetadata2 = (RecordMetadata)recordMetadataProducer.blockFirst();
            LOG.trace("Method [{}] produced record metadata: {}", method, (Object)recordMetadata2);
        } else {
            recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", (Object)method, recordMetadata));
        }
    }

    private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {
        return Arrays.stream(method.getArguments()).filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)).findFirst().orElseGet(() -> Arrays.stream(method.getArguments()).filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class)).findFirst().orElse(null));
    }

    private void configureDeserializers(ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration consumerConfiguration) {
        Properties properties = consumerConfiguration.getConfig();
        Argument bodyArgument = KafkaConsumerProcessor.findBodyArgument(method);
        if (!properties.containsKey("key.deserializer") && !consumerConfiguration.getKeyDeserializer().isPresent()) {
            Optional<Argument> keyArgument = Arrays.stream(method.getArguments()).filter(arg -> arg.isAnnotationPresent(KafkaKey.class)).findFirst();
            if (keyArgument.isPresent()) {
                consumerConfiguration.setKeyDeserializer(this.serdeRegistry.pickDeserializer(keyArgument.get()));
            } else if (bodyArgument != null && ConsumerRecord.class.isAssignableFrom(bodyArgument.getType())) {
                Optional keyType = bodyArgument.getTypeVariable("K");
                if (keyType.isPresent()) {
                    consumerConfiguration.setKeyDeserializer(this.serdeRegistry.pickDeserializer((Argument)keyType.get()));
                } else {
                    consumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer());
                }
            } else {
                consumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer());
            }
        }
        if (!properties.containsKey("value.deserializer") && !consumerConfiguration.getValueDeserializer().isPresent()) {
            if (bodyArgument == null) {
                consumerConfiguration.setValueDeserializer(new StringDeserializer());
            } else if (ConsumerRecord.class.isAssignableFrom(bodyArgument.getType())) {
                Optional valueType = bodyArgument.getTypeVariable("V");
                if (valueType.isPresent()) {
                    consumerConfiguration.setValueDeserializer(this.serdeRegistry.pickDeserializer((Argument)valueType.get()));
                } else {
                    consumerConfiguration.setValueDeserializer(new StringDeserializer());
                }
            } else {
                boolean batch = method.isTrue(KafkaListener.class, "batch");
                consumerConfiguration.setValueDeserializer(this.serdeRegistry.pickDeserializer(batch ? KafkaConsumerProcessor.getComponentType(bodyArgument) : bodyArgument));
            }
        }
        this.debugDeserializationConfiguration(method, consumerConfiguration, properties);
    }

    private static Argument getComponentType(Argument<?> argument) {
        Class argumentType = argument.getType();
        return argumentType.isArray() ? Argument.of(argumentType.getComponentType()) : argument.getFirstTypeVariable().orElse(argument);
    }

    private static OffsetCommitCallback resolveCommitCallback(Object consumerBean) {
        return (offsets, exception) -> {
            if (consumerBean instanceof OffsetCommitCallback) {
                ((OffsetCommitCallback)consumerBean).onComplete(offsets, exception);
            } else if (exception != null) {
                LOG.error("Error asynchronously committing Kafka offsets [{}]: {}", new Object[]{offsets, exception.getMessage(), exception});
            }
        };
    }

    private static /* synthetic */ void lambda$null$15(String[] destinationTopics, Object key, Object value, ConsumerRecord consumerRecord, Producer kafkaProducer, FluxSink emitter) {
        for (String destinationTopic : destinationTopics) {
            ProducerRecord record = new ProducerRecord(destinationTopic, null, key, value, (Iterable)consumerRecord.headers());
            kafkaProducer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    emitter.error((Throwable)exception);
                } else {
                    emitter.next((Object)metadata);
                }
            });
        }
        emitter.complete();
    }

    private static /* synthetic */ void lambda$createConsumerThreadPollLoop$10(Map boundArguments, Consumer kafkaConsumer, Argument argument) {
        boundArguments.put(argument, kafkaConsumer);
    }

    private static /* synthetic */ boolean lambda$createConsumerThreadPollLoop$9(Argument arg) {
        return Acknowledgement.class.isAssignableFrom(arg.getType());
    }

    private static /* synthetic */ boolean lambda$createConsumerThreadPollLoop$8(Argument arg) {
        return Consumer.class.isAssignableFrom(arg.getType());
    }

    private static /* synthetic */ Duration lambda$createConsumerThreadPollLoop$7() {
        return Duration.ofMillis(100L);
    }
}

