/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.ConsumerRegistry;
import io.micronaut.configuration.kafka.KafkaMessage;
import io.micronaut.configuration.kafka.ProducerRegistry;
import io.micronaut.configuration.kafka.TransactionalProducerRegistry;
import io.micronaut.configuration.kafka.annotation.ErrorStrategy;
import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.bind.ConsumerRecordBinderRegistry;
import io.micronaut.configuration.kafka.bind.batch.BatchConsumerRecordsBinderRegistry;
import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Blocking;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.ReturnType;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.ArrayUtils;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.SendTo;
import io.micronaut.messaging.exceptions.MessagingSystemException;
import io.micronaut.runtime.ApplicationConfiguration;
import io.micronaut.scheduling.ScheduledExecutorTaskScheduler;
import io.micronaut.scheduling.TaskScheduler;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.lang.invoke.LambdaMetafactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Singleton
@Requires(beans={KafkaDefaultConfiguration.class})
public class KafkaConsumerProcessor
implements ExecutableMethodProcessor<Topic>,
AutoCloseable,
ConsumerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
    private final ExecutorService executorService;
    private final ApplicationConfiguration applicationConfiguration;
    private final BeanContext beanContext;
    private final AbstractKafkaConsumerConfiguration defaultConsumerConfiguration;
    private final Map<String, ConsumerState> consumers = new ConcurrentHashMap<String, ConsumerState>();
    private final ConsumerRecordBinderRegistry binderRegistry;
    private final SerdeRegistry serdeRegistry;
    private final Scheduler executorScheduler;
    private final KafkaListenerExceptionHandler exceptionHandler;
    private final TaskScheduler taskScheduler;
    private final ProducerRegistry producerRegistry;
    private final TransactionalProducerRegistry transactionalProducerRegistry;
    private final BatchConsumerRecordsBinderRegistry batchBinderRegistry;
    private final AtomicInteger clientIdGenerator = new AtomicInteger(10);

    public KafkaConsumerProcessor(@Named(value="consumer") ExecutorService executorService, ApplicationConfiguration applicationConfiguration, BeanContext beanContext, AbstractKafkaConsumerConfiguration defaultConsumerConfiguration, ConsumerRecordBinderRegistry binderRegistry, BatchConsumerRecordsBinderRegistry batchBinderRegistry, SerdeRegistry serdeRegistry, ProducerRegistry producerRegistry, KafkaListenerExceptionHandler exceptionHandler, @Named(value="scheduled") ExecutorService schedulerService, TransactionalProducerRegistry transactionalProducerRegistry) {
        this.executorService = executorService;
        this.applicationConfiguration = applicationConfiguration;
        this.beanContext = beanContext;
        this.defaultConsumerConfiguration = defaultConsumerConfiguration;
        this.binderRegistry = binderRegistry;
        this.batchBinderRegistry = batchBinderRegistry;
        this.serdeRegistry = serdeRegistry;
        this.executorScheduler = Schedulers.fromExecutor((Executor)executorService);
        this.producerRegistry = producerRegistry;
        this.exceptionHandler = exceptionHandler;
        this.taskScheduler = new ScheduledExecutorTaskScheduler(schedulerService);
        this.transactionalProducerRegistry = transactionalProducerRegistry;
        this.beanContext.getBeanDefinitions(Qualifiers.byType((Class[])new Class[]{KafkaListener.class})).forEach(definition -> {
            if (definition.isSingleton()) {
                try {
                    beanContext.getBean(definition.getBeanType());
                }
                catch (Exception e) {
                    throw new MessagingSystemException("Error creating bean for @KafkaListener of type [" + definition.getBeanType() + "]: " + e.getMessage(), (Throwable)e);
                }
            }
        });
    }

    @NonNull
    private ConsumerState getConsumerState(@NonNull String id) {
        ConsumerState consumerState = this.consumers.get(id);
        if (consumerState == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        return consumerState;
    }

    @Override
    @NonNull
    public <K, V> Consumer<K, V> getConsumer(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Consumer<?, ?> consumer = this.getConsumerState((String)id).kafkaConsumer;
        if (consumer == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        return consumer;
    }

    @Override
    @NonNull
    public Set<String> getConsumerSubscription(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Set<String> subscriptions = this.getConsumerState((String)id).subscriptions;
        if (subscriptions == null || subscriptions.isEmpty()) {
            throw new IllegalArgumentException("No consumer subscription found for ID: " + id);
        }
        return subscriptions;
    }

    @Override
    @NonNull
    public Set<TopicPartition> getConsumerAssignment(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Set<TopicPartition> assignment = this.getConsumerState((String)id).assignments;
        if (assignment == null || assignment.isEmpty()) {
            throw new IllegalArgumentException("No consumer assignment found for ID: " + id);
        }
        return assignment;
    }

    @Override
    @NonNull
    public Set<String> getConsumerIds() {
        return Collections.unmodifiableSet(this.consumers.keySet());
    }

    @Override
    public boolean isPaused(@NonNull String id) {
        return this.isPaused(id, this.getConsumerState((String)id).assignments);
    }

    @Override
    public boolean isPaused(@NonNull String id, @NonNull Collection<TopicPartition> topicPartitions) {
        return this.getConsumerState(id).isPaused(topicPartitions);
    }

    @Override
    public void pause(@NonNull String id) {
        this.getConsumerState(id).pause();
    }

    @Override
    public void pause(@NonNull String id, @NonNull Collection<TopicPartition> topicPartitions) {
        this.getConsumerState(id).pause(topicPartitions);
    }

    @Override
    public void resume(@NonNull String id) {
        this.getConsumerState(id).resume();
    }

    @Override
    public void resume(@NonNull String id, @NonNull Collection<TopicPartition> topicPartitions) {
        this.getConsumerState(id).resume(topicPartitions);
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        List topicAnnotations = method.getDeclaredAnnotationValuesByType(Topic.class);
        AnnotationValue consumerAnnotation = method.getAnnotation(KafkaListener.class);
        if (CollectionUtils.isEmpty((Collection)topicAnnotations)) {
            topicAnnotations = beanDefinition.getDeclaredAnnotationValuesByType(Topic.class);
        }
        if (consumerAnnotation == null || CollectionUtils.isEmpty((Collection)topicAnnotations)) {
            return;
        }
        Class beanType = beanDefinition.getBeanType();
        String groupId = consumerAnnotation.stringValue("groupId").filter(StringUtils::isNotEmpty).orElseGet(() -> this.applicationConfiguration.getName().orElse(beanType.getName()));
        if (consumerAnnotation.isTrue("uniqueGroupId")) {
            groupId = groupId + "_" + UUID.randomUUID();
        }
        String clientId = consumerAnnotation.stringValue("clientId").filter(StringUtils::isNotEmpty).orElseGet(() -> this.applicationConfiguration.getName().map(s -> s + '-' + NameUtils.hyphenate((String)beanType.getSimpleName())).orElse(null));
        OffsetStrategy offsetStrategy = consumerAnnotation.enumValue("offsetStrategy", OffsetStrategy.class).orElse(OffsetStrategy.AUTO);
        AbstractKafkaConsumerConfiguration consumerConfigurationDefaults = this.beanContext.findBean(AbstractKafkaConsumerConfiguration.class, Qualifiers.byName((String)groupId)).orElse(this.defaultConsumerConfiguration);
        DefaultKafkaConsumerConfiguration consumerConfiguration = new DefaultKafkaConsumerConfiguration(consumerConfigurationDefaults);
        Properties properties = this.createConsumerProperties((AnnotationValue<KafkaListener>)consumerAnnotation, consumerConfiguration, clientId, groupId, offsetStrategy);
        this.configureDeserializers(method, consumerConfiguration);
        this.submitConsumerThreads(method, clientId, groupId, offsetStrategy, topicAnnotations, (AnnotationValue<KafkaListener>)consumerAnnotation, consumerConfiguration, properties, beanType);
    }

    @Override
    @PreDestroy
    public void close() {
        for (ConsumerState consumerState : this.consumers.values()) {
            consumerState.kafkaConsumer.wakeup();
        }
        this.consumers.clear();
    }

    private Properties createConsumerProperties(AnnotationValue<KafkaListener> consumerAnnotation, DefaultKafkaConsumerConfiguration consumerConfiguration, String clientId, String groupId, OffsetStrategy offsetStrategy) {
        Properties properties = consumerConfiguration.getConfig();
        if (consumerAnnotation.getRequiredValue("offsetReset", OffsetReset.class) == OffsetReset.EARLIEST) {
            properties.putIfAbsent("auto.offset.reset", OffsetReset.EARLIEST.name().toLowerCase());
        }
        properties.putIfAbsent("enable.auto.commit", String.valueOf(offsetStrategy == OffsetStrategy.AUTO));
        consumerAnnotation.get((CharSequence)"heartbeatInterval", Duration.class).map(Duration::toMillis).map(String::valueOf).ifPresent(heartbeatInterval -> properties.putIfAbsent("heartbeat.interval.ms", heartbeatInterval));
        consumerAnnotation.get((CharSequence)"sessionTimeout", Duration.class).map(Duration::toMillis).map(String::valueOf).ifPresent(sessionTimeout -> properties.putIfAbsent("session.timeout.ms", sessionTimeout));
        consumerAnnotation.enumValue("isolation", IsolationLevel.class).ifPresent(isolation -> properties.putIfAbsent("isolation.level", isolation.toString().toLowerCase(Locale.ROOT)));
        properties.put("group.id", groupId);
        if (clientId != null) {
            properties.put("client.id", clientId);
        }
        properties.putAll((Map<?, ?>)consumerAnnotation.getProperties("properties", "name"));
        return properties;
    }

    private void debugDeserializationConfiguration(ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration<?, ?> consumerConfiguration, Properties properties) {
        if (!LOG.isDebugEnabled()) {
            return;
        }
        Optional keyDeserializer = consumerConfiguration.getKeyDeserializer();
        if (consumerConfiguration.getKeyDeserializer().isPresent()) {
            LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), (Object)KafkaConsumerProcessor.logMethod(method));
        } else {
            LOG.debug("Using key deserializer [{}] for Kafka listener: {}", (Object)properties.getProperty("key.deserializer"), (Object)KafkaConsumerProcessor.logMethod(method));
        }
        Optional valueDeserializer = consumerConfiguration.getValueDeserializer();
        if (valueDeserializer.isPresent()) {
            LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), (Object)KafkaConsumerProcessor.logMethod(method));
        } else {
            LOG.debug("Using value deserializer [{}] for Kafka listener: {}", (Object)properties.getProperty("value.deserializer"), (Object)KafkaConsumerProcessor.logMethod(method));
        }
    }

    private void submitConsumerThreads(ExecutableMethod<?, ?> method, String clientId, String groupId, OffsetStrategy offsetStrategy, List<AnnotationValue<Topic>> topicAnnotations, AnnotationValue<KafkaListener> consumerAnnotation, DefaultKafkaConsumerConfiguration<?, ?> consumerConfiguration, Properties properties, Class<?> beanType) {
        int consumerThreads = consumerAnnotation.intValue("threads").orElse(1);
        for (int i = 0; i < consumerThreads; ++i) {
            String finalClientId;
            if (clientId != null) {
                finalClientId = consumerThreads > 1 ? clientId + '-' + this.clientIdGenerator.incrementAndGet() : clientId;
                properties.put("client.id", finalClientId);
            } else {
                finalClientId = "kafka-consumer-" + this.clientIdGenerator.incrementAndGet();
            }
            this.submitConsumerThread(method, finalClientId, groupId, offsetStrategy, topicAnnotations, consumerAnnotation, consumerConfiguration, beanType);
        }
    }

    private void submitConsumerThread(ExecutableMethod<?, ?> method, String clientId, String groupId, OffsetStrategy offsetStrategy, List<AnnotationValue<Topic>> topicAnnotations, AnnotationValue<KafkaListener> consumerAnnotation, DefaultKafkaConsumerConfiguration<?, ?> consumerConfiguration, Class<?> beanType) {
        Consumer kafkaConsumer = (Consumer)this.beanContext.createBean(Consumer.class, new Object[]{consumerConfiguration});
        Object consumerBean = this.beanContext.getBean(beanType);
        if (consumerBean instanceof ConsumerAware) {
            ((ConsumerAware)consumerBean).setKafkaConsumer(kafkaConsumer);
        }
        KafkaConsumerProcessor.setupConsumerSubscription(method, topicAnnotations, consumerBean, kafkaConsumer);
        ConsumerState consumerState = new ConsumerState(clientId, groupId, offsetStrategy, kafkaConsumer, consumerBean, Collections.unmodifiableSet(kafkaConsumer.subscription()), consumerAnnotation, method);
        this.consumers.put(clientId, consumerState);
        this.executorService.submit(() -> this.createConsumerThreadPollLoop(method, consumerState));
    }

    /*
     * Unable to fully structure code
     */
    private void createConsumerThreadPollLoop(ExecutableMethod<?, ?> method, ConsumerState consumerState) {
        isBatch = method.isTrue(KafkaListener.class, "batch");
        pollTimeout = method.getValue(KafkaListener.class, "pollTimeout", Duration.class).orElseGet((Supplier<Duration>)LambdaMetafactory.metafactory(null, null, null, ()Ljava/lang/Object;, lambda$createConsumerThreadPollLoop$8(), ()Ljava/time/Duration;)());
        consumerArg = Arrays.stream(method.getArguments()).filter((Predicate<Argument>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$createConsumerThreadPollLoop$9(io.micronaut.core.type.Argument ), (Lio/micronaut/core/type/Argument;)Z)()).findFirst();
        ackArg = Arrays.stream(method.getArguments()).filter((Predicate<Argument>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$createConsumerThreadPollLoop$10(io.micronaut.core.type.Argument ), (Lio/micronaut/core/type/Argument;)Z)()).findFirst();
        try {
            kafkaConsumer = consumerState.kafkaConsumer;
            var8_9 = null;
            try {
                try {
                    trackPartitions = ackArg.isPresent() != false || consumerState.offsetStrategy == OffsetStrategy.SYNC_PER_RECORD || consumerState.offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD;
                    boundArguments = new HashMap<Argument<?>, Object>(2);
                    consumerArg.ifPresent((java.util.function.Consumer<Argument>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$createConsumerThreadPollLoop$11(java.util.Map org.apache.kafka.clients.consumer.Consumer io.micronaut.core.type.Argument ), (Lio/micronaut/core/type/Argument;)V)(boundArguments, kafkaConsumer));
                    while (true) lbl-1000:
                    // 8 sources

                    {
                        consumerState.assignments = Collections.unmodifiableSet(kafkaConsumer.assignment());
                        if (consumerState.autoPaused) {
                            consumerState.pause(consumerState.assignments);
                            kafkaConsumer.pause(consumerState.assignments);
                        }
                        failed = true;
                        try {
                            consumerState.pauseTopicPartitions();
                            consumerRecords = kafkaConsumer.poll(pollTimeout);
                            failed = false;
                            consumerState.resumeTopicPartitions();
                            if (consumerRecords == null || consumerRecords.count() <= 0) ** GOTO lbl-1000
                            if (isBatch) {
                                failed = this.processConsumerRecordsAsBatch(consumerState, method, boundArguments, consumerRecords) == false;
                            } else {
                                v0 = failed = this.processConsumerRecords(consumerState, method, boundArguments, trackPartitions, ackArg, consumerRecords) == false;
                            }
                            if (failed) ** GOTO lbl-1000
                            if (consumerState.offsetStrategy == OffsetStrategy.SYNC) {
                                try {
                                    kafkaConsumer.commitSync();
                                }
                                catch (CommitFailedException e) {
                                    this.handleException(consumerState, null, e);
                                }
                            }
                            if (consumerState.offsetStrategy != OffsetStrategy.ASYNC) ** GOTO lbl-1000
                            kafkaConsumer.commitAsync(KafkaConsumerProcessor.resolveCommitCallback(consumerState.consumerBean));
                        }
                        catch (WakeupException e) {
                            try {
                                if (!failed && consumerState.offsetStrategy != OffsetStrategy.DISABLED) {
                                    kafkaConsumer.commitSync();
                                }
                            }
                            catch (Throwable ex) {
                                KafkaConsumerProcessor.LOG.warn("Error committing Kafka offsets on shutdown: {}", (Object)ex.getMessage(), (Object)ex);
                            }
                            throw e;
                        }
                        catch (Throwable e) {
                            this.handleException(consumerState, null, e);
                            continue;
                        }
                        break;
                    }
                }
                catch (Throwable var9_11) {
                    var8_9 = var9_11;
                    throw var9_11;
                }
                ** GOTO lbl-1000
            }
            catch (Throwable var14_19) {
                if (kafkaConsumer != null) {
                    if (var8_9 != null) {
                        try {
                            kafkaConsumer.close();
                        }
                        catch (Throwable var15_20) {
                            var8_9.addSuppressed(var15_20);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                throw var14_19;
            }
        }
        catch (WakeupException var7_8) {
            return;
        }
    }

    private boolean processConsumerRecords(ConsumerState consumerState, ExecutableMethod<?, ?> method, Map<Argument<?>, Object> boundArguments, boolean trackPartitions, Optional<Argument<?>> ackArg, ConsumerRecords<?, ?> consumerRecords) {
        DefaultExecutableBinder executableBinder = new DefaultExecutableBinder(boundArguments);
        HashMap<TopicPartition, OffsetAndMetadata> currentOffsets = trackPartitions ? new HashMap<TopicPartition, OffsetAndMetadata>() : null;
        for (ConsumerRecord consumerRecord : consumerRecords) {
            Consumer<?, ?> kafkaConsumer;
            block11: {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Kafka consumer [{}] received record: {}", (Object)KafkaConsumerProcessor.logMethod(method), (Object)consumerRecord);
                }
                if (trackPartitions) {
                    TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                    OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1L, null);
                    currentOffsets.put(topicPartition, offsetAndMetadata);
                }
                kafkaConsumer = consumerState.kafkaConsumer;
                ackArg.ifPresent(argument -> boundArguments.put((Argument<?>)argument, () -> kafkaConsumer.commitSync(currentOffsets)));
                try {
                    BoundExecutable boundExecutable = executableBinder.bind(method, (ArgumentBinderRegistry)this.binderRegistry, (Object)consumerRecord);
                    Object result = boundExecutable.invoke(consumerState.consumerBean);
                    if (result != null) {
                        boolean isBlocking;
                        Flux resultFlowable;
                        if (Publishers.isConvertibleToPublisher((Object)result)) {
                            resultFlowable = Flux.from((Publisher)((Publisher)Publishers.convertPublisher((Object)result, Publisher.class)));
                            isBlocking = method.hasAnnotation(Blocking.class);
                        } else {
                            resultFlowable = Flux.just((Object)result);
                            isBlocking = true;
                        }
                        this.handleResultFlux(consumerState, method, consumerRecord, resultFlowable, isBlocking, consumerRecords);
                    }
                }
                catch (Throwable e) {
                    if (!this.resolveWithErrorStrategy(consumerState, consumerRecord, e)) break block11;
                    return false;
                }
            }
            if (consumerState.offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) {
                try {
                    kafkaConsumer.commitSync(currentOffsets);
                }
                catch (CommitFailedException e) {
                    this.handleException(consumerState, consumerRecord, e);
                }
                continue;
            }
            if (consumerState.offsetStrategy != OffsetStrategy.ASYNC_PER_RECORD) continue;
            kafkaConsumer.commitAsync(currentOffsets, KafkaConsumerProcessor.resolveCommitCallback(consumerState.consumerBean));
        }
        return true;
    }

    private boolean resolveWithErrorStrategy(ConsumerState consumerState, ConsumerRecord<?, ?> consumerRecord, Throwable e) {
        ErrorStrategyValue currentErrorStrategy = consumerState.errorStrategy;
        if (currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR && consumerState.errorStrategyRetryCount != 0) {
            if (consumerState.partitionRetries == null) {
                consumerState.partitionRetries = new HashMap<Integer, PartitionRetryState>();
            }
            int partition = consumerRecord.partition();
            PartitionRetryState retryState = consumerState.partitionRetries.computeIfAbsent(partition, t -> new PartitionRetryState());
            if (retryState.currentRetryOffset != consumerRecord.offset()) {
                retryState.currentRetryOffset = consumerRecord.offset();
                retryState.currentRetryCount = 1;
            } else {
                ++retryState.currentRetryCount;
            }
            if (consumerState.errorStrategyRetryCount >= retryState.currentRetryCount) {
                TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), partition);
                consumerState.kafkaConsumer.seek(topicPartition, consumerRecord.offset());
                Duration retryDelay = consumerState.errorStrategyRetryDelay;
                if (retryDelay != null) {
                    Set<TopicPartition> paused = Collections.singleton(topicPartition);
                    consumerState.pause(paused);
                    this.taskScheduler.schedule(retryDelay, () -> consumerState.resume(paused));
                }
                return true;
            }
            consumerState.partitionRetries.remove(partition);
            currentErrorStrategy = ErrorStrategyValue.RESUME_AT_NEXT_RECORD;
        }
        this.handleException(consumerState, consumerRecord, e);
        return currentErrorStrategy != ErrorStrategyValue.RESUME_AT_NEXT_RECORD;
    }

    private boolean processConsumerRecordsAsBatch(ConsumerState consumerState, ExecutableMethod<?, ?> method, Map<Argument<?>, Object> boundArguments, ConsumerRecords<?, ?> consumerRecords) {
        DefaultExecutableBinder batchBinder = new DefaultExecutableBinder(boundArguments);
        BoundExecutable boundExecutable = batchBinder.bind(method, (ArgumentBinderRegistry)this.batchBinderRegistry, consumerRecords);
        List<Object> result = boundExecutable.invoke(consumerState.consumerBean);
        if (result != null) {
            boolean isBlocking;
            if (result.getClass().isArray()) {
                result = Arrays.asList((Object[])result);
            }
            boolean isPublisher = Publishers.isConvertibleToPublisher((Object)result);
            Flux resultFlux = result instanceof Iterable ? Flux.fromIterable((Iterable)result) : (isPublisher ? Flux.from((Publisher)((Publisher)Publishers.convertPublisher(result, Publisher.class))) : Flux.just(result));
            Iterator iterator = consumerRecords.iterator();
            boolean bl = isBlocking = !isPublisher || method.hasAnnotation(Blocking.class);
            if (isBlocking) {
                List objects = (List)resultFlux.collectList().block();
                for (Object object : objects) {
                    if (!iterator.hasNext()) continue;
                    ConsumerRecord consumerRecord = (ConsumerRecord)iterator.next();
                    this.handleResultFlux(consumerState, method, consumerRecord, Flux.just(object), isBlocking, consumerRecords);
                }
            } else {
                resultFlux.subscribe(o -> {
                    if (iterator.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord)iterator.next();
                        this.handleResultFlux(consumerState, method, consumerRecord, Flux.just((Object)o), isBlocking, consumerRecords);
                    }
                });
            }
        }
        return true;
    }

    private static void setupConsumerSubscription(ExecutableMethod<?, ?> method, List<AnnotationValue<Topic>> topicAnnotations, Object consumerBean, Consumer<?, ?> kafkaConsumer) {
        for (AnnotationValue<Topic> topicAnnotation : topicAnnotations) {
            Object[] topicNames = topicAnnotation.stringValues();
            Object[] patterns = topicAnnotation.stringValues("patterns");
            boolean hasTopics = ArrayUtils.isNotEmpty((Object[])topicNames);
            boolean hasPatterns = ArrayUtils.isNotEmpty((Object[])patterns);
            if (!hasTopics && !hasPatterns) {
                throw new MessagingSystemException("Either a topic or a topic must be specified for method: " + method);
            }
            if (hasTopics) {
                List<Object> topics = Arrays.asList(topicNames);
                if (consumerBean instanceof ConsumerRebalanceListener) {
                    kafkaConsumer.subscribe(topics, (ConsumerRebalanceListener)consumerBean);
                } else {
                    kafkaConsumer.subscribe(topics);
                }
                if (LOG.isInfoEnabled()) {
                    LOG.info("Kafka listener [{}] subscribed to topics: {}", (Object)KafkaConsumerProcessor.logMethod(method), topics);
                }
            }
            if (!hasPatterns) continue;
            for (Object pattern : patterns) {
                Pattern compiledPattern;
                try {
                    compiledPattern = Pattern.compile((String)pattern);
                }
                catch (Exception e) {
                    throw new MessagingSystemException("Invalid topic pattern [" + (String)pattern + "] for method [" + method + "]: " + e.getMessage(), (Throwable)e);
                }
                if (consumerBean instanceof ConsumerRebalanceListener) {
                    kafkaConsumer.subscribe(compiledPattern, (ConsumerRebalanceListener)consumerBean);
                } else {
                    kafkaConsumer.subscribe(compiledPattern);
                }
                if (!LOG.isInfoEnabled()) continue;
                LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", (Object)KafkaConsumerProcessor.logMethod(method), pattern);
            }
        }
    }

    private void handleException(ConsumerState consumerState, ConsumerRecord<?, ?> consumerRecord, Throwable e) {
        this.handleException(consumerState.consumerBean, new KafkaListenerException(e, consumerState.consumerBean, consumerState.kafkaConsumer, consumerRecord));
    }

    private void handleException(Object consumerBean, KafkaListenerException kafkaListenerException) {
        if (consumerBean instanceof KafkaListenerExceptionHandler) {
            ((KafkaListenerExceptionHandler)consumerBean).handle((Throwable)((Object)kafkaListenerException));
        } else {
            this.exceptionHandler.handle((Throwable)((Object)kafkaListenerException));
        }
    }

    private void handleResultFlux(ConsumerState consumerState, ExecutableMethod<?, ?> method, ConsumerRecord<?, ?> consumerRecord, Flux<?> resultFlowable, boolean isBlocking, ConsumerRecords<?, ?> consumerRecords) {
        Flux recordMetadataProducer = resultFlowable.flatMap(value -> {
            if (consumerState.sendToDestinationTopics != null) {
                Object key = consumerRecord.key();
                if (value != null) {
                    Producer kafkaProducer = consumerState.useSendOffsetsToTransaction ? this.transactionalProducerRegistry.getTransactionalProducer(consumerState.producerClientId, consumerState.producerTransactionalId, Argument.of(byte[].class), Argument.of(Object.class)) : this.producerRegistry.getProducer(consumerState.producerClientId == null ? consumerState.groupId : consumerState.producerClientId, Argument.of(key != null ? key.getClass() : byte[].class), Argument.of(value.getClass()));
                    return Flux.create(emitter -> {
                        try {
                            if (consumerState.useSendOffsetsToTransaction) {
                                try {
                                    LOG.trace("Beginning transaction for producer: {}", (Object)consumerState.producerTransactionalId);
                                    kafkaProducer.beginTransaction();
                                }
                                catch (ProducerFencedException e) {
                                    this.handleProducerFencedException(kafkaProducer, e);
                                }
                            }
                            for (String destinationTopic : consumerState.sendToDestinationTopics) {
                                if (consumerState.isMessagesIterableReturnType) {
                                    Iterable messages = (Iterable)value;
                                    for (KafkaMessage message : messages) {
                                        ProducerRecord record = this.createFromMessage(destinationTopic, message);
                                        kafkaProducer.send(record, (metadata, exception) -> {
                                            if (exception != null) {
                                                emitter.error((Throwable)exception);
                                            } else {
                                                emitter.next((Object)metadata);
                                            }
                                        });
                                    }
                                    continue;
                                }
                                ProducerRecord record = consumerState.isMessageReturnType ? this.createFromMessage(destinationTopic, (KafkaMessage)value) : new ProducerRecord(destinationTopic, null, key, value, (Iterable)consumerRecord.headers());
                                LOG.trace("Sending record: {} for producer: {} {}", new Object[]{record, kafkaProducer, consumerState.producerTransactionalId});
                                kafkaProducer.send(record, (metadata, exception) -> {
                                    if (exception != null) {
                                        emitter.error((Throwable)exception);
                                    } else {
                                        emitter.next((Object)metadata);
                                    }
                                });
                            }
                            if (consumerState.useSendOffsetsToTransaction) {
                                HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
                                for (TopicPartition partition : consumerRecords.partitions()) {
                                    List partitionedRecords = consumerRecords.records(partition);
                                    long offset = ((ConsumerRecord)partitionedRecords.get(partitionedRecords.size() - 1)).offset();
                                    offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1L));
                                }
                                try {
                                    LOG.trace("Sending offsets: {} to transaction for producer: {} and customer group id: {}", new Object[]{offsetsToCommit, consumerState.producerTransactionalId, consumerState.groupId});
                                    kafkaProducer.sendOffsetsToTransaction(offsetsToCommit, consumerState.groupId);
                                    LOG.trace("Committing transaction for producer: {}", (Object)consumerState.producerTransactionalId);
                                    kafkaProducer.commitTransaction();
                                    LOG.trace("Committed transaction for producer: {}", (Object)consumerState.producerTransactionalId);
                                }
                                catch (ProducerFencedException e) {
                                    this.handleProducerFencedException(kafkaProducer, e);
                                }
                            }
                            emitter.complete();
                        }
                        catch (Exception e) {
                            if (consumerState.useSendOffsetsToTransaction) {
                                try {
                                    LOG.trace("Aborting transaction for producer: {} because of error: {}", (Object)consumerState.producerTransactionalId, (Object)e.getMessage());
                                    kafkaProducer.abortTransaction();
                                }
                                catch (ProducerFencedException ex) {
                                    this.handleProducerFencedException(kafkaProducer, ex);
                                }
                            }
                            emitter.error((Throwable)e);
                        }
                    });
                }
                return Flux.empty();
            }
            return Flux.empty();
        });
        recordMetadataProducer = recordMetadataProducer.onErrorResume(throwable -> {
            this.handleException(consumerState.consumerBean, new KafkaListenerException("Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), (Throwable)throwable, consumerState.consumerBean, consumerState.kafkaConsumer, consumerRecord));
            if (consumerState.redelivery) {
                LOG.debug("Attempting redelivery of record [{}] following error", (Object)consumerRecord);
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                if (key != null && value != null) {
                    Producer kafkaProducer = this.producerRegistry.getProducer(consumerState.producerClientId == null ? consumerState.groupId : consumerState.producerClientId, Argument.of(key.getClass()), Argument.of(value.getClass()));
                    ProducerRecord record = new ProducerRecord(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), key, value, (Iterable)consumerRecord.headers());
                    return this.producerSend(consumerState, kafkaProducer, record).doOnError(ex -> this.handleException(consumerState.consumerBean, new KafkaListenerException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), (Throwable)throwable, consumerState.consumerBean, consumerState.kafkaConsumer, consumerRecord)));
                }
            }
            return Flux.empty();
        });
        if (isBlocking) {
            List listRecords = (List)recordMetadataProducer.collectList().block();
            LOG.trace("Method [{}] produced record metadata: {}", method, (Object)listRecords);
        } else {
            recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", (Object)KafkaConsumerProcessor.logMethod(method), recordMetadata));
        }
    }

    private Mono<RecordMetadata> producerSend(ConsumerState consumerState, Producer<?, ?> producer, ProducerRecord record) {
        LOG.trace("Sending record: {} for producer: {} {}", new Object[]{record, producer, consumerState.producerTransactionalId});
        return Mono.create(emitter -> producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                emitter.error((Throwable)exception);
            } else {
                emitter.success((Object)metadata);
            }
        }));
    }

    private ProducerRecord createFromMessage(String topic, KafkaMessage<?, ?> message) {
        return new ProducerRecord(message.getTopic() == null ? topic : message.getTopic(), message.getPartition(), message.getTimestamp(), message.getKey(), message.getBody(), this.convertHeaders(message));
    }

    private List<RecordHeader> convertHeaders(KafkaMessage<?, ?> message) {
        return message.getHeaders() == null ? null : message.getHeaders().entrySet().stream().map(e -> new RecordHeader((String)e.getKey(), e.getValue().toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
    }

    private void handleProducerFencedException(Producer<?, ?> producer, ProducerFencedException e) {
        LOG.error("Failed accessing the producer: " + producer, (Throwable)e);
        this.transactionalProducerRegistry.close(producer);
    }

    private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {
        return Arrays.stream(method.getArguments()).filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)).findFirst().orElseGet(() -> Arrays.stream(method.getArguments()).filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class)).findFirst().orElse(null));
    }

    private void configureDeserializers(ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration consumerConfiguration) {
        Properties properties = consumerConfiguration.getConfig();
        Argument bodyArgument = KafkaConsumerProcessor.findBodyArgument(method);
        if (!properties.containsKey("key.deserializer") && !consumerConfiguration.getKeyDeserializer().isPresent()) {
            Optional<Argument> keyArgument = Arrays.stream(method.getArguments()).filter(arg -> arg.isAnnotationPresent(KafkaKey.class)).findFirst();
            if (keyArgument.isPresent()) {
                consumerConfiguration.setKeyDeserializer(this.serdeRegistry.pickDeserializer(keyArgument.get()));
            } else if (bodyArgument != null && ConsumerRecord.class.isAssignableFrom(bodyArgument.getType())) {
                Optional keyType = bodyArgument.getTypeVariable("K");
                if (keyType.isPresent()) {
                    consumerConfiguration.setKeyDeserializer(this.serdeRegistry.pickDeserializer((Argument)keyType.get()));
                } else {
                    consumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer());
                }
            } else {
                consumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer());
            }
        }
        if (!properties.containsKey("value.deserializer") && !consumerConfiguration.getValueDeserializer().isPresent()) {
            if (bodyArgument == null) {
                consumerConfiguration.setValueDeserializer(new StringDeserializer());
            } else if (ConsumerRecord.class.isAssignableFrom(bodyArgument.getType())) {
                Optional valueType = bodyArgument.getTypeVariable("V");
                if (valueType.isPresent()) {
                    consumerConfiguration.setValueDeserializer(this.serdeRegistry.pickDeserializer((Argument)valueType.get()));
                } else {
                    consumerConfiguration.setValueDeserializer(new StringDeserializer());
                }
            } else {
                boolean batch = method.isTrue(KafkaListener.class, "batch");
                consumerConfiguration.setValueDeserializer(this.serdeRegistry.pickDeserializer(batch ? KafkaConsumerProcessor.getComponentType(bodyArgument) : bodyArgument));
            }
        }
        this.debugDeserializationConfiguration(method, consumerConfiguration, properties);
    }

    private static Argument getComponentType(Argument<?> argument) {
        Class argumentType = argument.getType();
        return argumentType.isArray() ? Argument.of(argumentType.getComponentType()) : argument.getFirstTypeVariable().orElse(argument);
    }

    private static OffsetCommitCallback resolveCommitCallback(Object consumerBean) {
        return (offsets, exception) -> {
            if (consumerBean instanceof OffsetCommitCallback) {
                ((OffsetCommitCallback)consumerBean).onComplete(offsets, exception);
            } else if (exception != null) {
                LOG.error("Error asynchronously committing Kafka offsets [{}]: {}", new Object[]{offsets, exception.getMessage(), exception});
            }
        };
    }

    private static String logMethod(ExecutableMethod<?, ?> method) {
        return method.getDeclaringType().getSimpleName() + "#" + method.getName();
    }

    private static /* synthetic */ void lambda$createConsumerThreadPollLoop$11(Map boundArguments, Consumer kafkaConsumer, Argument argument) {
        boundArguments.put(argument, kafkaConsumer);
    }

    private static /* synthetic */ boolean lambda$createConsumerThreadPollLoop$10(Argument arg) {
        return Acknowledgement.class.isAssignableFrom(arg.getType());
    }

    private static /* synthetic */ boolean lambda$createConsumerThreadPollLoop$9(Argument arg) {
        return Consumer.class.isAssignableFrom(arg.getType());
    }

    private static /* synthetic */ Duration lambda$createConsumerThreadPollLoop$8() {
        return Duration.ofMillis(100L);
    }

    private static final class PartitionRetryState {
        long currentRetryOffset;
        int currentRetryCount;

        private PartitionRetryState() {
        }
    }

    private static final class ConsumerState {
        final String clientId;
        final Consumer<?, ?> kafkaConsumer;
        final Object consumerBean;
        final Set<String> subscriptions;
        Set<TopicPartition> assignments;
        Set<TopicPartition> _pausedTopicPartitions;
        Set<TopicPartition> _pauseRequests;
        @Nullable
        final String groupId;
        final boolean redelivery;
        final OffsetStrategy offsetStrategy;
        final ErrorStrategyValue errorStrategy;
        @Nullable
        final Duration errorStrategyRetryDelay;
        final int errorStrategyRetryCount;
        @Nullable
        Map<Integer, PartitionRetryState> partitionRetries;
        boolean autoPaused;
        final String producerClientId;
        final String producerTransactionalId;
        final boolean transactional;
        @Nullable
        final String[] sendToDestinationTopics;
        final boolean useSendOffsetsToTransaction;
        final boolean isMessageReturnType;
        final boolean isMessagesIterableReturnType;

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private ConsumerState(String clientId, String groupId, OffsetStrategy offsetStrategy, Consumer<?, ?> consumer, Object consumerBean, Set<String> subscriptions, AnnotationValue<KafkaListener> kafkaListener, ExecutableMethod<?, ?> method) {
            this.clientId = clientId;
            this.groupId = groupId;
            this.kafkaConsumer = consumer;
            this.consumerBean = consumerBean;
            this.subscriptions = subscriptions;
            this.offsetStrategy = offsetStrategy;
            this.redelivery = kafkaListener.isTrue("redelivery");
            AnnotationValue errorStrategyAnnotation = kafkaListener.getAnnotation("errorStrategy", ErrorStrategy.class).orElse(null);
            ErrorStrategyValue errorStrategyValue = this.errorStrategy = errorStrategyAnnotation == null ? ErrorStrategyValue.NONE : kafkaListener.get((CharSequence)"errorStrategy", ErrorStrategy.class).map(ErrorStrategy::value).orElse(ErrorStrategyValue.NONE);
            if (this.errorStrategy == ErrorStrategyValue.RETRY_ON_ERROR) {
                Duration retryDelay = errorStrategyAnnotation.get((CharSequence)"retryDelay", Duration.class).orElse(Duration.ofSeconds(1L));
                this.errorStrategyRetryDelay = retryDelay.isNegative() || retryDelay.isZero() ? null : retryDelay;
                this.errorStrategyRetryCount = errorStrategyAnnotation.intValue("retryCount").orElse(1);
            } else {
                this.errorStrategyRetryDelay = null;
                this.errorStrategyRetryCount = 0;
            }
            this.autoPaused = kafkaListener.booleanValue("autoStartup").orElse(true) == false;
            this.producerClientId = kafkaListener.stringValue("producerClientId").orElse(null);
            this.producerTransactionalId = kafkaListener.stringValue("producerTransactionalId").orElse(null);
            this.transactional = StringUtils.isNotEmpty((CharSequence)this.producerTransactionalId);
            Object[] destinationTopics = method.stringValues(SendTo.class);
            Object object = this.sendToDestinationTopics = ArrayUtils.isNotEmpty((Object[])destinationTopics) ? destinationTopics : null;
            if (offsetStrategy == OffsetStrategy.SEND_TO_TRANSACTION) {
                if (!this.transactional || !method.hasAnnotation(SendTo.class)) throw new MessagingSystemException("Offset strategy 'SEND_TO_TRANSACTION' can only be used when transaction is enabled and @SendTo is used");
                this.useSendOffsetsToTransaction = true;
            } else {
                this.useSendOffsetsToTransaction = false;
            }
            if (this.useSendOffsetsToTransaction && this.redelivery) {
                throw new MessagingSystemException("Redelivery not supported for transactions in combination with @SendTo");
            }
            ReturnType returnType = method.getReturnType();
            this.isMessageReturnType = returnType.getType().isAssignableFrom(KafkaMessage.class) || returnType.isAsyncOrReactive() && returnType.getFirstTypeVariable().map(t -> t.getType().isAssignableFrom(KafkaMessage.class)).orElse(false) != false;
            this.isMessagesIterableReturnType = Iterable.class.isAssignableFrom(returnType.getType()) && returnType.getFirstTypeVariable().map(t -> t.getType().isAssignableFrom(KafkaMessage.class)).orElse(false) != false;
        }

        void pause() {
            this.pause(this.assignments);
        }

        synchronized void pause(@NonNull Collection<TopicPartition> topicPartitions) {
            if (this._pauseRequests == null) {
                this._pauseRequests = new HashSet<TopicPartition>();
            }
            this._pauseRequests.addAll(topicPartitions);
        }

        synchronized void resume() {
            this.autoPaused = false;
            this._pauseRequests = null;
        }

        synchronized void resume(@NonNull Collection<TopicPartition> topicPartitions) {
            if (this._pauseRequests != null) {
                this._pauseRequests.removeAll(topicPartitions);
            }
        }

        synchronized boolean isPaused(@NonNull Collection<TopicPartition> topicPartitions) {
            if (this._pauseRequests == null || this._pausedTopicPartitions == null) {
                return false;
            }
            return this._pauseRequests.containsAll(topicPartitions) && this._pausedTopicPartitions.containsAll(topicPartitions);
        }

        synchronized void resumeTopicPartitions() {
            Set paused = this.kafkaConsumer.paused();
            if (paused.isEmpty()) {
                return;
            }
            List<TopicPartition> toResume = paused.stream().filter(topicPartition -> this._pauseRequests == null || !this._pauseRequests.contains(topicPartition)).collect(Collectors.toList());
            if (!toResume.isEmpty()) {
                LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", (Object)this.clientId, toResume);
                this.kafkaConsumer.resume(toResume);
            }
            if (this._pausedTopicPartitions != null) {
                toResume.forEach(this._pausedTopicPartitions::remove);
            }
        }

        synchronized void pauseTopicPartitions() {
            if (this._pauseRequests == null || this._pauseRequests.isEmpty()) {
                return;
            }
            this.kafkaConsumer.pause(this._pauseRequests);
            LOG.debug("Paused Kafka consumption for Consumer [{}] from topic partition: {}", (Object)this.clientId, (Object)this.kafkaConsumer.paused());
            if (this._pausedTopicPartitions == null) {
                this._pausedTopicPartitions = new HashSet<TopicPartition>();
            }
            this._pausedTopicPartitions.addAll(this._pauseRequests);
        }
    }
}

