/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.ConsumerRegistry;
import io.micronaut.configuration.kafka.KafkaAcknowledgement;
import io.micronaut.configuration.kafka.ProducerRegistry;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.bind.ConsumerRecordBinderRegistry;
import io.micronaut.configuration.kafka.bind.batch.BatchConsumerRecordsBinderRegistry;
import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Blocking;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.Executable;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.ArrayUtils;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.Body;
import io.micronaut.messaging.annotation.SendTo;
import io.micronaut.messaging.exceptions.MessagingSystemException;
import io.micronaut.runtime.ApplicationConfiguration;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import javax.annotation.PreDestroy;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Requires(beans={KafkaDefaultConfiguration.class})
public class KafkaConsumerProcessor
implements ExecutableMethodProcessor<KafkaListener>,
AutoCloseable,
ConsumerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
    private final ExecutorService executorService;
    private final ApplicationConfiguration applicationConfiguration;
    private final BeanContext beanContext;
    private final AbstractKafkaConsumerConfiguration defaultConsumerConfiguration;
    private final Map<String, Consumer> consumers = new ConcurrentHashMap<String, Consumer>();
    private final Map<String, Consumer> pausedConsumers = new ConcurrentHashMap<String, Consumer>();
    private final Set<String> paused = new ConcurrentSkipListSet<String>();
    private final ConsumerRecordBinderRegistry binderRegistry;
    private final SerdeRegistry serdeRegistry;
    private final Scheduler executorScheduler;
    private final KafkaListenerExceptionHandler exceptionHandler;
    private final ProducerRegistry producerRegistry;
    private final BatchConsumerRecordsBinderRegistry batchBinderRegistry;
    private final AtomicInteger clientIdGenerator = new AtomicInteger(10);

    public KafkaConsumerProcessor(@Named(value="consumer") ExecutorService executorService, ApplicationConfiguration applicationConfiguration, BeanContext beanContext, AbstractKafkaConsumerConfiguration defaultConsumerConfiguration, ConsumerRecordBinderRegistry binderRegistry, BatchConsumerRecordsBinderRegistry batchBinderRegistry, SerdeRegistry serdeRegistry, ProducerRegistry producerRegistry, KafkaListenerExceptionHandler exceptionHandler) {
        this.executorService = executorService;
        this.applicationConfiguration = applicationConfiguration;
        this.beanContext = beanContext;
        this.defaultConsumerConfiguration = defaultConsumerConfiguration;
        this.binderRegistry = binderRegistry;
        this.batchBinderRegistry = batchBinderRegistry;
        this.serdeRegistry = serdeRegistry;
        this.executorScheduler = Schedulers.from((Executor)executorService);
        this.producerRegistry = producerRegistry;
        this.exceptionHandler = exceptionHandler;
        this.beanContext.getBeanDefinitions(Qualifiers.byType((Class[])new Class[]{KafkaListener.class})).forEach(definition -> {
            if (definition.isSingleton()) {
                try {
                    beanContext.getBean(definition.getBeanType());
                }
                catch (Exception e) {
                    throw new MessagingSystemException("Error creating bean for @KafkaListener of type [" + definition.getBeanType() + "]: " + e.getMessage(), (Throwable)e);
                }
            }
        });
    }

    @Override
    @Nonnull
    public <K, V> Consumer<K, V> getConsumer(@Nonnull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Consumer consumer = this.consumers.get(id);
        if (consumer == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        return consumer;
    }

    @Override
    @Nonnull
    public Set<String> getConsumerIds() {
        return Collections.unmodifiableSet(this.consumers.keySet());
    }

    @Override
    public boolean isPaused(@Nonnull String id) {
        if (StringUtils.isNotEmpty((CharSequence)id) && this.consumers.containsKey(id)) {
            return this.paused.contains(id) && this.pausedConsumers.containsKey(id);
        }
        return false;
    }

    @Override
    public void pause(@Nonnull String id) {
        if (!StringUtils.isNotEmpty((CharSequence)id) || !this.consumers.containsKey(id)) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        this.paused.add(id);
    }

    @Override
    public void resume(@Nonnull String id) {
        if (!StringUtils.isNotEmpty((CharSequence)id) || !this.consumers.containsKey(id)) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        this.paused.remove(id);
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        List topicAnnotations = method.getDeclaredAnnotationValuesByType(Topic.class);
        AnnotationValue consumerAnnotation = method.getAnnotation(KafkaListener.class);
        if (CollectionUtils.isEmpty((Collection)topicAnnotations)) {
            topicAnnotations = beanDefinition.getDeclaredAnnotationValuesByType(Topic.class);
        }
        if (consumerAnnotation != null && !CollectionUtils.isEmpty((Collection)topicAnnotations)) {
            String clientId;
            Duration pollTimeout = method.getValue(KafkaListener.class, "pollTimeout", Duration.class).orElse(Duration.ofMillis(100L));
            Duration sessionTimeout = method.getValue(KafkaListener.class, "sessionTimeout", Duration.class).orElse(null);
            Duration heartbeatInterval = method.getValue(KafkaListener.class, "heartbeatInterval", Duration.class).orElse(null);
            boolean isBatch = method.isTrue(KafkaListener.class, "batch");
            Optional<Argument> consumerArg = Arrays.stream(method.getArguments()).filter(arg -> Consumer.class.isAssignableFrom(arg.getType())).findFirst();
            Optional<Argument> ackArg = Arrays.stream(method.getArguments()).filter(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()) || io.micronaut.messaging.Acknowledgement.class.isAssignableFrom(arg.getType())).findFirst();
            String groupId = consumerAnnotation.stringValue("groupId").orElse(null);
            Class beanType = beanDefinition.getBeanType();
            if (StringUtils.isEmpty((CharSequence)groupId)) {
                groupId = this.applicationConfiguration.getName().orElse(beanType.getName());
            }
            boolean hasUniqueGroupId = consumerAnnotation.isTrue("uniqueGroupId");
            String uniqueGroupId = groupId;
            if (hasUniqueGroupId) {
                uniqueGroupId = uniqueGroupId + "_" + UUID.randomUUID().toString();
            }
            if (StringUtils.isEmpty((CharSequence)(clientId = (String)consumerAnnotation.stringValue("clientId").orElse(null)))) {
                clientId = this.applicationConfiguration.getName().map(s -> s + '-' + NameUtils.hyphenate((String)beanType.getSimpleName())).orElse(null);
            }
            OffsetStrategy offsetStrategy = consumerAnnotation.enumValue("offsetStrategy", OffsetStrategy.class).orElse(OffsetStrategy.AUTO);
            int consumerThreads = consumerAnnotation.intValue("threads").orElse(1);
            AbstractKafkaConsumerConfiguration consumerConfigurationDefaults = this.beanContext.findBean(AbstractKafkaConsumerConfiguration.class, Qualifiers.byName((String)groupId)).orElse(this.defaultConsumerConfiguration);
            DefaultKafkaConsumerConfiguration consumerConfiguration = new DefaultKafkaConsumerConfiguration(consumerConfigurationDefaults);
            Properties properties = consumerConfiguration.getConfig();
            if (consumerAnnotation.getRequiredValue("offsetReset", OffsetReset.class) == OffsetReset.EARLIEST) {
                properties.putIfAbsent("auto.offset.reset", OffsetReset.EARLIEST.name().toLowerCase());
            }
            properties.putIfAbsent("enable.auto.commit", String.valueOf(offsetStrategy == OffsetStrategy.AUTO));
            if (heartbeatInterval != null) {
                properties.putIfAbsent("heartbeat.interval.ms", String.valueOf(heartbeatInterval.toMillis()));
            }
            if (sessionTimeout != null) {
                long sessionTimeoutMillis = sessionTimeout.toMillis();
                properties.putIfAbsent("session.timeout.ms", String.valueOf(sessionTimeoutMillis));
            }
            properties.put("group.id", hasUniqueGroupId ? uniqueGroupId : groupId);
            if (clientId != null) {
                properties.put("client.id", clientId);
            }
            properties.putAll((Map<?, ?>)consumerAnnotation.getProperties("properties", "name"));
            this.configureDeserializers(method, consumerConfiguration);
            if (LOG.isDebugEnabled()) {
                Optional kd = consumerConfiguration.getKeyDeserializer();
                if (kd.isPresent()) {
                    LOG.debug("Using key deserializer [{}] for Kafka listener: {}", kd.get(), method);
                } else {
                    LOG.debug("Using key deserializer [{}] for Kafka listener: {}", (Object)properties.getProperty("key.deserializer"), method);
                }
                Optional vd = consumerConfiguration.getValueDeserializer();
                if (vd.isPresent()) {
                    LOG.debug("Using value deserializer [{}] for Kafka listener: {}", vd.get(), method);
                } else {
                    LOG.debug("Using value deserializer [{}] for Kafka listener: {}", (Object)properties.getProperty("value.deserializer"), method);
                }
            }
            for (int i = 0; i < consumerThreads; ++i) {
                String finalClientId;
                if (clientId != null) {
                    finalClientId = consumerThreads > 1 ? clientId + '-' + this.clientIdGenerator.incrementAndGet() : clientId;
                    properties.put("client.id", finalClientId);
                } else {
                    finalClientId = "kafka-consumer-" + this.clientIdGenerator.incrementAndGet();
                }
                Consumer kafkaConsumer = (Consumer)this.beanContext.createBean(Consumer.class, new Object[]{consumerConfiguration});
                this.consumers.put(finalClientId, kafkaConsumer);
                Object consumerBean = this.beanContext.getBean(beanType);
                if (consumerBean instanceof ConsumerAware) {
                    ((ConsumerAware)consumerBean).setKafkaConsumer(kafkaConsumer);
                }
                for (AnnotationValue topicAnnotation : topicAnnotations) {
                    Object[] topicNames = topicAnnotation.stringValues();
                    Object[] patterns = topicAnnotation.stringValues("patterns");
                    boolean hasTopics = ArrayUtils.isNotEmpty((Object[])topicNames);
                    boolean hasPatterns = ArrayUtils.isNotEmpty((Object[])patterns);
                    if (!hasTopics && !hasPatterns) {
                        throw new MessagingSystemException("Either a topic or a topic must be specified for method: " + method);
                    }
                    if (hasTopics) {
                        List<Object> topics = Arrays.asList(topicNames);
                        if (consumerBean instanceof ConsumerRebalanceListener) {
                            kafkaConsumer.subscribe(topics, (ConsumerRebalanceListener)consumerBean);
                        } else {
                            kafkaConsumer.subscribe(topics);
                        }
                        LOG.info("Kafka listener [{}] subscribed to topics: {}", method, topics);
                    }
                    if (!hasPatterns) continue;
                    for (Object pattern : patterns) {
                        Pattern p;
                        try {
                            p = Pattern.compile((String)pattern);
                        }
                        catch (Exception e) {
                            throw new MessagingSystemException("Invalid topic pattern [" + (String)pattern + "] for method [" + method + "]: " + e.getMessage(), (Throwable)e);
                        }
                        if (consumerBean instanceof ConsumerRebalanceListener) {
                            kafkaConsumer.subscribe(p, (ConsumerRebalanceListener)consumerBean);
                        } else {
                            kafkaConsumer.subscribe(p);
                        }
                        LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", method, pattern);
                    }
                }
                this.executorService.submit(() -> {
                    try {
                        try {
                            boolean trackPartitions = ackArg.isPresent() || offsetStrategy == OffsetStrategy.SYNC_PER_RECORD || offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD;
                            HashMap boundArguments = new HashMap(2);
                            consumerArg.ifPresent(argument -> boundArguments.put(argument, (KafkaAcknowledgement)kafkaConsumer));
                            boolean consumerPaused = false;
                            block23: while (true) {
                                try {
                                    while (true) {
                                        boolean isBlocking;
                                        HashMap<TopicPartition, OffsetAndMetadata> currentOffsets;
                                        if (!consumerPaused && this.paused.contains(finalClientId)) {
                                            consumerPaused = true;
                                            LOG.debug("Pausing Kafka consumption for Consumer [{}] from topic partition: {}", (Object)finalClientId, (Object)kafkaConsumer.paused());
                                            kafkaConsumer.pause((Collection)kafkaConsumer.assignment());
                                            this.pausedConsumers.put(finalClientId, kafkaConsumer);
                                        }
                                        ConsumerRecords consumerRecords = kafkaConsumer.poll(pollTimeout);
                                        boolean failed = false;
                                        if (consumerPaused && !this.paused.contains(finalClientId)) {
                                            LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", (Object)finalClientId, (Object)kafkaConsumer.paused());
                                            kafkaConsumer.resume((Collection)kafkaConsumer.paused());
                                            this.pausedConsumers.remove(finalClientId);
                                            consumerPaused = false;
                                        }
                                        HashMap<TopicPartition, OffsetAndMetadata> hashMap = currentOffsets = trackPartitions ? new HashMap<TopicPartition, OffsetAndMetadata>() : null;
                                        if (consumerRecords == null || consumerRecords.count() <= 0) continue;
                                        if (isBatch) {
                                            DefaultExecutableBinder batchBinder = new DefaultExecutableBinder(boundArguments);
                                            BoundExecutable boundExecutable = batchBinder.bind((Executable)method, (ArgumentBinderRegistry)this.batchBinderRegistry, (Object)consumerRecords);
                                            List<Object> result = boundExecutable.invoke(consumerBean);
                                            if (result != null) {
                                                if (result.getClass().isArray()) {
                                                    result = Arrays.asList((Object[])result);
                                                }
                                                boolean isPublisher = Publishers.isConvertibleToPublisher((Object)result);
                                                Flowable resultFlowable = result instanceof Iterable ? Flowable.fromIterable((Iterable)result) : (isPublisher ? (Flowable)Publishers.convertPublisher(result, Flowable.class) : Flowable.just(result));
                                                Iterator iterator = consumerRecords.iterator();
                                                boolean bl = isBlocking = !isPublisher || method.hasAnnotation(Blocking.class);
                                                if (isBlocking) {
                                                    resultFlowable.blockingSubscribe(o -> {
                                                        if (iterator.hasNext()) {
                                                            ConsumerRecord consumerRecord = (ConsumerRecord)iterator.next();
                                                            this.handleResultFlowable((AnnotationValue<KafkaListener>)consumerAnnotation, consumerBean, method, kafkaConsumer, consumerRecord, Flowable.just((Object)o), isBlocking);
                                                        }
                                                    });
                                                } else {
                                                    resultFlowable.forEach(o -> {
                                                        if (iterator.hasNext()) {
                                                            ConsumerRecord consumerRecord = (ConsumerRecord)iterator.next();
                                                            this.handleResultFlowable((AnnotationValue<KafkaListener>)consumerAnnotation, consumerBean, method, kafkaConsumer, consumerRecord, Flowable.just((Object)o), isBlocking);
                                                        }
                                                    });
                                                }
                                            }
                                        } else {
                                            DefaultExecutableBinder executableBinder = new DefaultExecutableBinder(boundArguments);
                                            for (ConsumerRecord consumerRecord : consumerRecords) {
                                                block42: {
                                                    LOG.trace("Kafka consumer [{}] received record: {}", (Object)method, (Object)consumerRecord);
                                                    if (trackPartitions) {
                                                        currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1L, null));
                                                    }
                                                    if (ackArg.isPresent()) {
                                                        boundArguments.put(ackArg.get(), () -> kafkaConsumer.commitSync(currentOffsets));
                                                    }
                                                    try {
                                                        Flowable resultFlowable;
                                                        BoundExecutable boundExecutable = executableBinder.bind((Executable)method, (ArgumentBinderRegistry)this.binderRegistry, (Object)consumerRecord);
                                                        Object result = boundExecutable.invoke(consumerBean);
                                                        if (result == null) break block42;
                                                        if (Publishers.isConvertibleToPublisher((Object)result)) {
                                                            resultFlowable = (Flowable)Publishers.convertPublisher((Object)result, Flowable.class);
                                                            isBlocking = method.hasAnnotation(Blocking.class);
                                                        } else {
                                                            resultFlowable = Flowable.just((Object)result);
                                                            isBlocking = true;
                                                        }
                                                        this.handleResultFlowable((AnnotationValue<KafkaListener>)consumerAnnotation, consumerBean, method, kafkaConsumer, consumerRecord, resultFlowable, isBlocking);
                                                    }
                                                    catch (Throwable e) {
                                                        this.handleException(kafkaConsumer, consumerBean, consumerRecord, e);
                                                        failed = true;
                                                        break;
                                                    }
                                                }
                                                if (offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) {
                                                    try {
                                                        kafkaConsumer.commitSync(currentOffsets);
                                                    }
                                                    catch (CommitFailedException e) {
                                                        this.handleException(kafkaConsumer, consumerBean, consumerRecord, e);
                                                    }
                                                    continue;
                                                }
                                                if (offsetStrategy != OffsetStrategy.ASYNC_PER_RECORD) continue;
                                                kafkaConsumer.commitAsync(currentOffsets, this.resolveCommitCallback(consumerBean));
                                            }
                                        }
                                        if (failed) continue;
                                        if (offsetStrategy == OffsetStrategy.SYNC) {
                                            try {
                                                kafkaConsumer.commitSync();
                                                continue block23;
                                            }
                                            catch (CommitFailedException e) {
                                                this.handleException(kafkaConsumer, consumerBean, null, e);
                                                continue;
                                            }
                                        }
                                        if (offsetStrategy != OffsetStrategy.ASYNC) continue;
                                        kafkaConsumer.commitAsync(this.resolveCommitCallback(consumerBean));
                                    }
                                }
                                catch (WakeupException e) {
                                    throw e;
                                }
                                catch (Throwable e) {
                                    this.handleException(kafkaConsumer, consumerBean, null, e);
                                    continue;
                                }
                                break;
                            }
                        }
                        catch (WakeupException trackPartitions) {
                            try {
                                if (offsetStrategy != OffsetStrategy.DISABLED) {
                                    kafkaConsumer.commitSync();
                                }
                            }
                            catch (Throwable e) {
                                LOG.warn("Error committing Kafka offsets on shutdown: {}", (Object)e.getMessage(), (Object)e);
                            }
                            finally {
                                kafkaConsumer.close();
                            }
                        }
                    }
                    catch (Throwable throwable) {
                        try {
                            if (offsetStrategy != OffsetStrategy.DISABLED) {
                                kafkaConsumer.commitSync();
                            }
                        }
                        catch (Throwable e) {
                            LOG.warn("Error committing Kafka offsets on shutdown: {}", (Object)e.getMessage(), (Object)e);
                        }
                        finally {
                            kafkaConsumer.close();
                        }
                        throw throwable;
                    }
                });
            }
        }
    }

    @Override
    @PreDestroy
    public void close() {
        for (Consumer consumer : this.consumers.values()) {
            consumer.wakeup();
        }
        this.consumers.clear();
    }

    private void handleException(Consumer kafkaConsumer, Object consumerBean, ConsumerRecord<?, ?> consumerRecord, Throwable e) {
        KafkaListenerException kafkaListenerException = new KafkaListenerException(e, consumerBean, kafkaConsumer, consumerRecord);
        this.handleException(consumerBean, kafkaListenerException);
    }

    private void handleException(Object consumerBean, KafkaListenerException kafkaListenerException) {
        if (consumerBean instanceof KafkaListenerExceptionHandler) {
            ((KafkaListenerExceptionHandler)consumerBean).handle((Throwable)((Object)kafkaListenerException));
        } else {
            this.exceptionHandler.handle((Throwable)((Object)kafkaListenerException));
        }
    }

    private void handleResultFlowable(AnnotationValue<KafkaListener> kafkaListener, Object consumerBean, ExecutableMethod<?, ?> method, Consumer kafkaConsumer, ConsumerRecord<?, ?> consumerRecord, Flowable<?> resultFlowable, boolean isBlocking) {
        Flowable recordMetadataProducer = resultFlowable.subscribeOn(this.executorScheduler).flatMap(o -> {
            Object[] destinationTopics = method.stringValues(SendTo.class);
            if (ArrayUtils.isNotEmpty((Object[])destinationTopics)) {
                Object key = consumerRecord.key();
                Object value = o;
                if (value != null) {
                    String groupId = kafkaListener.stringValue("groupId").orElse(null);
                    Producer kafkaProducer = this.producerRegistry.getProducer(StringUtils.isNotEmpty((CharSequence)groupId) ? groupId : null, Argument.of(key != null ? key.getClass() : byte[].class), Argument.of(value.getClass()));
                    return Flowable.create(arg_0 -> KafkaConsumerProcessor.lambda$null$10((String[])destinationTopics, key, value, consumerRecord, kafkaProducer, arg_0), (BackpressureStrategy)BackpressureStrategy.ERROR);
                }
                return Flowable.empty();
            }
            return Flowable.empty();
        }).onErrorResumeNext(throwable -> {
            this.handleException(consumerBean, new KafkaListenerException("Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), (Throwable)throwable, consumerBean, (Consumer<?, ?>)kafkaConsumer, consumerRecord));
            if (kafkaListener.isTrue("redelivery")) {
                LOG.debug("Attempting redelivery of record [{}] following error", (Object)consumerRecord);
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                if (key != null && value != null) {
                    String groupId = kafkaListener.stringValue("groupId").orElse(null);
                    Producer kafkaProducer = this.producerRegistry.getProducer(StringUtils.isNotEmpty((CharSequence)groupId) ? groupId : null, Argument.of(key.getClass()), Argument.of(value.getClass()));
                    ProducerRecord record = new ProducerRecord(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), key, value, (Iterable)consumerRecord.headers());
                    return Flowable.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> {
                        if (exception != null) {
                            this.handleException(consumerBean, new KafkaListenerException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), (Throwable)throwable, consumerBean, (Consumer<?, ?>)kafkaConsumer, consumerRecord));
                            emitter.onComplete();
                        } else {
                            emitter.onNext((Object)metadata);
                            emitter.onComplete();
                        }
                    }), (BackpressureStrategy)BackpressureStrategy.ERROR);
                }
            }
            return Flowable.empty();
        });
        if (isBlocking) {
            recordMetadataProducer.blockingSubscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", (Object)method, recordMetadata));
        } else {
            recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", (Object)method, recordMetadata));
        }
    }

    private Argument findBodyArgument(ExecutableMethod<?, ?> method) {
        return Arrays.stream(method.getArguments()).filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(Body.class)).findFirst().orElseGet(() -> Arrays.stream(method.getArguments()).filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class)).findFirst().orElse(null));
    }

    private void configureDeserializers(ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration consumerConfiguration) {
        Properties properties = consumerConfiguration.getConfig();
        Argument<?> bodyArgument = this.findBodyArgument(method);
        if (!properties.containsKey("key.deserializer") && !consumerConfiguration.getKeyDeserializer().isPresent()) {
            Optional<Argument> keyArgument = Arrays.stream(method.getArguments()).filter(arg -> arg.isAnnotationPresent(KafkaKey.class)).findFirst();
            if (keyArgument.isPresent()) {
                consumerConfiguration.setKeyDeserializer(this.serdeRegistry.pickDeserializer(keyArgument.get()));
            } else if (bodyArgument != null && ConsumerRecord.class.isAssignableFrom(bodyArgument.getType())) {
                Optional keyType = bodyArgument.getTypeVariable("K");
                if (keyType.isPresent()) {
                    consumerConfiguration.setKeyDeserializer(this.serdeRegistry.pickDeserializer((Argument)keyType.get()));
                } else {
                    consumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer());
                }
            } else {
                consumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer());
            }
        }
        if (!properties.containsKey("value.deserializer") && !consumerConfiguration.getValueDeserializer().isPresent()) {
            if (bodyArgument != null) {
                if (ConsumerRecord.class.isAssignableFrom(bodyArgument.getType())) {
                    Optional valueType = bodyArgument.getTypeVariable("V");
                    if (valueType.isPresent()) {
                        consumerConfiguration.setValueDeserializer(this.serdeRegistry.pickDeserializer((Argument)valueType.get()));
                    } else {
                        consumerConfiguration.setValueDeserializer(new StringDeserializer());
                    }
                } else {
                    boolean batch = method.isTrue(KafkaListener.class, "batch");
                    consumerConfiguration.setValueDeserializer(this.serdeRegistry.pickDeserializer(batch ? this.getComponentType(bodyArgument) : bodyArgument));
                }
            } else {
                consumerConfiguration.setValueDeserializer(new StringDeserializer());
            }
        }
    }

    private Argument<?> getComponentType(Argument<?> argument) {
        Class argumentType = argument.getType();
        return argumentType.isArray() ? Argument.of(argumentType.getComponentType()) : argument.getFirstTypeVariable().orElse(argument);
    }

    private OffsetCommitCallback resolveCommitCallback(Object consumerBean) {
        return (offsets, exception) -> {
            if (consumerBean instanceof OffsetCommitCallback) {
                ((OffsetCommitCallback)consumerBean).onComplete(offsets, exception);
            } else if (exception != null) {
                LOG.error("Error asynchronously committing Kafka offsets [{}]: {}", new Object[]{offsets, exception.getMessage(), exception});
            }
        };
    }

    private static /* synthetic */ void lambda$null$10(String[] destinationTopics, Object key, Object value, ConsumerRecord consumerRecord, Producer kafkaProducer, FlowableEmitter emitter) throws Exception {
        for (String destinationTopic : destinationTopics) {
            ProducerRecord record = new ProducerRecord(destinationTopic, null, key, value, (Iterable)consumerRecord.headers());
            kafkaProducer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    emitter.onError((Throwable)exception);
                } else {
                    emitter.onNext((Object)metadata);
                }
            });
        }
        emitter.onComplete();
    }
}

