/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener.adapter;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.WildcardType;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.expression.MapAccessor;
import org.springframework.core.MethodParameter;
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.PropertyAccessor;
import org.springframework.expression.TypeConverter;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeConverter;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.AdapterUtils;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.kafka.listener.adapter.HandlerAdapter;
import org.springframework.kafka.listener.adapter.InvocationResult;
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.TypeUtils;
import reactor.core.publisher.Mono;

public abstract class MessagingMessageListenerAdapter<K, V>
implements ConsumerSeekAware,
AsyncRepliesAware {
    private static final SpelExpressionParser PARSER = new SpelExpressionParser();
    private static final Acknowledgment NO_OP_ACK = new NoOpAck();
    protected static final Message<KafkaNull> NULL_MESSAGE = new GenericMessage((Object)KafkaNull.INSTANCE);
    private static final boolean monoPresent = ClassUtils.isPresent((String)"reactor.core.publisher.Mono", (ClassLoader)MessageListener.class.getClassLoader());
    private final Object bean;
    protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
    private final Type inferredType;
    private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
    private final KafkaListenerErrorHandler errorHandler;
    @Nullable
    private HandlerAdapter handlerMethod;
    private boolean isConsumerRecordList;
    private boolean isConsumerRecords;
    private boolean isMessageList;
    private boolean conversionNeeded = true;
    private RecordMessageConverter messageConverter = new MessagingMessageConverter();
    private boolean converterSet;
    private Type fallbackType = Object.class;
    private Expression replyTopicExpression;
    private KafkaTemplate replyTemplate;
    private boolean hasAckParameter;
    private boolean noOpAck;
    private boolean hasMetadataParameter;
    private boolean messageReturnType;
    private ReplyHeadersConfigurer replyHeadersConfigurer;
    private boolean splitIterables = true;
    private String correlationHeaderName = "kafka_correlationId";
    private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
    private BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback;

    protected MessagingMessageListenerAdapter(Object bean, Method method) {
        this(bean, method, null);
    }

    protected MessagingMessageListenerAdapter(Object bean, Method method, @Nullable KafkaListenerErrorHandler errorHandler) {
        this.bean = bean;
        this.inferredType = this.determineInferredType(method);
        this.errorHandler = errorHandler;
    }

    public void setCorrelationHeaderName(String correlationHeaderName) {
        Assert.notNull((Object)correlationHeaderName, (String)"'correlationHeaderName' cannot be null");
        this.correlationHeaderName = correlationHeaderName;
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
        this.converterSet = true;
    }

    protected final RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessagingConverter(SmartMessageConverter messageConverter) {
        Assert.isTrue((!this.converterSet ? 1 : 0) != 0, (String)"Cannot set the SmartMessageConverter when setting the messageConverter, add the SmartConverter to the message converter instead");
        ((MessagingMessageConverter)this.messageConverter).setMessagingConverter(messageConverter);
    }

    protected Type getType() {
        return this.inferredType == null ? this.fallbackType : this.inferredType;
    }

    public void setFallbackType(Class<?> fallbackType) {
        this.fallbackType = fallbackType;
    }

    public void setHandlerMethod(HandlerAdapter handlerMethod) {
        this.handlerMethod = handlerMethod;
    }

    public void setObservationRegistry(ObservationRegistry observationRegistry) {
        this.observationRegistry = observationRegistry;
    }

    @Override
    public boolean isAsyncReplies() {
        return this.handlerMethod != null && this.handlerMethod.isAsyncReplies();
    }

    protected boolean isConsumerRecordList() {
        return this.isConsumerRecordList;
    }

    public boolean isConsumerRecords() {
        return this.isConsumerRecords;
    }

    public boolean isConversionNeeded() {
        return this.conversionNeeded;
    }

    public void setReplyTopic(String replyTopicParam) {
        String replyTopic = replyTopicParam;
        if (!StringUtils.hasText((String)replyTopic)) {
            replyTopic = AdapterUtils.getDefaultReplyTopicExpression();
        }
        this.replyTopicExpression = replyTopic.contains(AdapterUtils.PARSER_CONTEXT.getExpressionPrefix()) ? PARSER.parseExpression(replyTopic, AdapterUtils.PARSER_CONTEXT) : new LiteralExpression(replyTopic);
    }

    public void setReplyTemplate(KafkaTemplate<?, ?> replyTemplate) {
        this.replyTemplate = replyTemplate;
    }

    public void setBeanResolver(BeanResolver beanResolver) {
        this.evaluationContext.setBeanResolver(beanResolver);
        this.evaluationContext.setTypeConverter((TypeConverter)new StandardTypeConverter());
        this.evaluationContext.addPropertyAccessor((PropertyAccessor)new MapAccessor());
    }

    public void setCallbackForAsyncFailure(@Nullable BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
        this.asyncRetryCallback = asyncRetryCallback;
    }

    protected boolean isMessageList() {
        return this.isMessageList;
    }

    protected ReplyHeadersConfigurer getReplyHeadersConfigurer() {
        return this.replyHeadersConfigurer;
    }

    public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer) {
        this.replyHeadersConfigurer = replyHeadersConfigurer;
    }

    protected boolean isSplitIterables() {
        return this.splitIterables;
    }

    public void setSplitIterables(boolean splitIterables) {
        this.splitIterables = splitIterables;
    }

    @Override
    public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback) {
        Object object = this.bean;
        if (object instanceof ConsumerSeekAware) {
            ConsumerSeekAware csa = (ConsumerSeekAware)object;
            csa.registerSeekCallback(callback);
        }
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
        Object object = this.bean;
        if (object instanceof ConsumerSeekAware) {
            ConsumerSeekAware csa = (ConsumerSeekAware)object;
            csa.onPartitionsAssigned(assignments, callback);
        }
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        Object object = this.bean;
        if (object instanceof ConsumerSeekAware) {
            ConsumerSeekAware csa = (ConsumerSeekAware)object;
            csa.onPartitionsRevoked(partitions);
        }
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
        Object object = this.bean;
        if (object instanceof ConsumerSeekAware) {
            ConsumerSeekAware csa = (ConsumerSeekAware)object;
            csa.onIdleContainer(assignments, callback);
        }
    }

    protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        return this.getMessageConverter().toMessage(cRecord, acknowledgment, consumer, this.getType());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer, Message<?> message) {
        Object listenerError = null;
        Object result = null;
        Observation currentObservation = this.getCurrentObservation();
        try {
            result = this.invokeHandler(records, acknowledgment, message, consumer);
            if (result != null) {
                this.handleResult(result, records, acknowledgment, consumer, message);
            }
        }
        catch (ListenerExecutionFailedException e) {
            listenerError = e;
            currentObservation.error((Throwable)((Object)e));
            this.handleException(records, acknowledgment, consumer, message, e);
        }
        catch (Error e) {
            listenerError = e;
            currentObservation.error((Throwable)e);
        }
        finally {
            if (listenerError != null || result == null) {
                currentObservation.stop();
            }
        }
    }

    private Observation getCurrentObservation() {
        Observation currentObservation = this.observationRegistry.getCurrentObservation();
        return currentObservation == null ? Observation.NOOP : currentObservation;
    }

    @Nullable
    protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message, Consumer<?, ?> consumer) {
        Acknowledgment ack = acknowledgment;
        if (ack == null && this.noOpAck) {
            ack = NO_OP_ACK;
        }
        Assert.notNull((Object)this.handlerMethod, (String)"the 'handlerMethod' must not be null");
        try {
            if (data instanceof List && !this.isConsumerRecordList) {
                return this.handlerMethod.invoke(message, ack, consumer);
            }
            if (this.hasMetadataParameter) {
                return this.handlerMethod.invoke(message, data, ack, consumer, AdapterUtils.buildConsumerRecordMetadata(data));
            }
            return this.handlerMethod.invoke(message, data, ack, consumer);
        }
        catch (MessageConversionException ex) {
            throw this.checkAckArg(ack, message, (Exception)((Object)new MessageConversionException("Cannot handle message", (Throwable)ex)));
        }
        catch (MethodArgumentNotValidException ex) {
            throw this.checkAckArg(ack, message, (Exception)((Object)ex));
        }
        catch (MessagingException ex) {
            throw new ListenerExecutionFailedException(this.createMessagingErrorMessage("Listener method could not be invoked with the incoming message", message.getPayload()), ex);
        }
        catch (Exception ex) {
            throw new ListenerExecutionFailedException("Listener method '" + this.handlerMethod.getMethodAsString(message.getPayload()) + "' threw exception", ex);
        }
    }

    private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Message<?> message, Exception ex) {
        if (this.hasAckParameter && acknowledgment == null) {
            return new ListenerExecutionFailedException("invokeHandler Failed", new IllegalStateException("No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment."));
        }
        return new ListenerExecutionFailedException(this.createMessagingErrorMessage("Listener method could not be invoked with the incoming message", message.getPayload()), ex);
    }

    protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer, @Nullable Message<?> source) {
        CompletableFuture completableFutureResult;
        boolean messageReturnType;
        Object result;
        Observation observation = this.getCurrentObservation();
        this.logger.debug(() -> "Listener method returned result [" + resultArg + "] - generating response message for it");
        String replyTopic = this.evaluateReplyTopic(request, source, resultArg);
        Assert.state((replyTopic == null || this.replyTemplate != null ? 1 : 0) != 0, (String)"a KafkaTemplate is required to support replies");
        if (resultArg instanceof InvocationResult) {
            InvocationResult invocationResult = (InvocationResult)resultArg;
            v0 = invocationResult.result();
        } else {
            v0 = result = resultArg;
        }
        if (resultArg instanceof InvocationResult) {
            InvocationResult invocationResult = (InvocationResult)resultArg;
            v1 = invocationResult.messageReturnType();
        } else {
            v1 = messageReturnType = this.messageReturnType;
        }
        if (monoPresent && result instanceof Mono) {
            Mono mono = (Mono)result;
            if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
                this.logger.warn((CharSequence)"Container 'Acknowledgment' must be async ack for Mono<?> return type (or Kotlin suspend function); otherwise the container will ack the message immediately");
            }
            completableFutureResult = mono.toFuture();
        } else if (!(result instanceof CompletableFuture)) {
            completableFutureResult = CompletableFuture.completedFuture(result);
        } else {
            completableFutureResult = (CompletableFuture)result;
            if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
                this.logger.warn((CharSequence)"Container 'Acknowledgment' must be async ack for Future<?> return type; otherwise the container will ack the message immediately");
            }
        }
        completableFutureResult.whenComplete((r, t) -> {
            try {
                if (t == null) {
                    this.asyncSuccess(r, replyTopic, source, messageReturnType);
                    if (this.isAsyncReplies()) {
                        this.acknowledge(acknowledgment);
                    }
                } else {
                    Throwable cause = t instanceof CompletionException ? t.getCause() : t;
                    observation.error(cause);
                    this.asyncFailure(request, acknowledgment, consumer, cause, source);
                }
            }
            finally {
                observation.stop();
            }
        });
    }

    @Nullable
    private String evaluateReplyTopic(Object request, Object source, Object result) {
        String replyTo = null;
        if (result instanceof InvocationResult) {
            InvocationResult invResult = (InvocationResult)result;
            replyTo = this.evaluateTopic(request, source, result, invResult.sendTo());
        } else if (this.replyTopicExpression != null) {
            replyTo = this.evaluateTopic(request, source, result, this.replyTopicExpression);
        }
        return replyTo;
    }

    @Nullable
    private String evaluateTopic(Object request, Object source, Object result, @Nullable Expression sendTo) {
        if (sendTo instanceof LiteralExpression) {
            return (String)sendTo.getValue(String.class);
        }
        Object value = sendTo == null ? null : sendTo.getValue((EvaluationContext)this.evaluationContext, (Object)new ReplyExpressionRoot(request, source, result));
        boolean isByteArray = value instanceof byte[];
        if (value != null && !(value instanceof String) && !isByteArray) {
            throw new IllegalStateException("replyTopic expression must evaluate to a String or byte[], it is: " + value.getClass().getName());
        }
        if (isByteArray) {
            return new String((byte[])value, StandardCharsets.UTF_8);
        }
        return (String)value;
    }

    protected void sendResponse(Object result, @Nullable String topic, @Nullable Object source, boolean returnTypeMessage) {
        Iterable iterable;
        if (!returnTypeMessage && topic == null) {
            this.logger.debug(() -> "No replyTopic to handle the reply: " + result);
        } else if (result instanceof Message) {
            Message mResult = (Message)result;
            Message<?> reply = this.checkHeaders(mResult, topic, source);
            this.replyTemplate.send(reply);
        } else if (result instanceof Iterable && (this.iterableOfMessages(iterable = (Iterable)result) || this.splitIterables)) {
            iterable.forEach(v -> {
                if (v instanceof Message) {
                    Message mv = (Message)v;
                    Message<?> aReply = this.checkHeaders(mv, topic, source);
                    this.replyTemplate.send(aReply);
                } else {
                    this.replyTemplate.send(topic, v);
                }
            });
        } else {
            this.sendSingleResult(result, topic, source);
        }
    }

    private boolean iterableOfMessages(Iterable<?> iterable) {
        Iterator<?> iterator = iterable.iterator();
        return iterator.hasNext() && iterator.next() instanceof Message;
    }

    private Message<?> checkHeaders(Message<?> reply, @Nullable String topic, @Nullable Object source) {
        boolean needsPartition;
        MessageHeaders headers = reply.getHeaders();
        boolean needsTopic = topic != null && headers.get((Object)"kafka_topic") == null;
        boolean sourceIsMessage = source instanceof Message;
        boolean needsCorrelation = headers.get((Object)this.correlationHeaderName) == null && sourceIsMessage && this.getCorrelation((Message)source) != null;
        boolean bl = needsPartition = headers.get((Object)"kafka_partitionId") == null && sourceIsMessage && this.getReplyPartition((Message)source) != null;
        if (needsTopic || needsCorrelation || needsPartition) {
            MessageBuilder builder = MessageBuilder.fromMessage(reply);
            if (needsTopic) {
                builder.setHeader("kafka_topic", (Object)topic);
            }
            if (needsCorrelation) {
                this.setCorrelation(builder, (Message)source);
            }
            if (needsPartition) {
                this.setPartition(builder, (Message)source);
            }
            reply = builder.build();
        }
        return reply;
    }

    private void sendSingleResult(Object result, String topic, @Nullable Object source) {
        if (source instanceof Message) {
            Message message = (Message)source;
            this.sendReplyForMessageSource(result, topic, message, this.getCorrelation(message));
        } else {
            this.replyTemplate.send(topic, result);
        }
    }

    private void sendReplyForMessageSource(Object result, String topic, Message<?> source, @Nullable byte[] correlationId) {
        MessageBuilder builder = MessageBuilder.withPayload((Object)result).setHeader("kafka_topic", (Object)topic);
        if (this.replyHeadersConfigurer != null) {
            Map<String, Object> headersToCopy = source.getHeaders().entrySet().stream().filter(e -> {
                String key = (String)e.getKey();
                return !key.equals("id") && !key.equals("timestamp") && !key.equals(this.correlationHeaderName) && !key.startsWith("kafka_received");
            }).filter(e -> this.replyHeadersConfigurer.shouldCopy((String)e.getKey(), e.getValue())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            if (!headersToCopy.isEmpty()) {
                builder.copyHeaders(headersToCopy);
            }
            if (!ObjectUtils.isEmpty(headersToCopy = this.replyHeadersConfigurer.additionalHeaders())) {
                builder.copyHeaders(headersToCopy);
            }
        }
        if (correlationId != null) {
            builder.setHeader(this.correlationHeaderName, (Object)correlationId);
        }
        this.setPartition(builder, source);
        this.setKey(builder, source);
        this.replyTemplate.send(builder.build());
    }

    protected void asyncSuccess(@Nullable Object result, String replyTopic, Message<?> source, boolean returnTypeMessage) {
        if (result == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((CharSequence)"Async result is null, ignoring");
            }
        } else {
            this.sendResponse(result, replyTopic, source, returnTypeMessage);
        }
    }

    protected void acknowledge(@Nullable Acknowledgment acknowledgment) {
        if (acknowledgment != null) {
            acknowledgment.acknowledge();
        }
    }

    protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer, Throwable t, Message<?> source) {
        block2: {
            try {
                Throwable cause = t instanceof CompletionException ? t.getCause() : t;
                this.handleException(request, acknowledgment, consumer, source, new ListenerExecutionFailedException(this.createMessagingErrorMessage("Async Fail", source.getPayload()), cause));
            }
            catch (Throwable ex) {
                this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
                this.acknowledge(acknowledgment);
                if (!MessagingMessageListenerAdapter.canAsyncRetry(request, ex) || this.asyncRetryCallback == null) break block2;
                ConsumerRecord record = (ConsumerRecord)request;
                this.asyncRetryCallback.accept(record, (RuntimeException)ex);
            }
        }
    }

    private static boolean canAsyncRetry(Object request, Throwable exception) {
        return request instanceof ConsumerRecord && exception instanceof RuntimeException;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer, Message<?> message, ListenerExecutionFailedException e) {
        if (this.errorHandler == null) throw e;
        try {
            Object errorResult;
            if (NULL_MESSAGE.equals((Object)message)) {
                message = new GenericMessage(records);
            }
            if ((errorResult = this.errorHandler.handleError((Message<?>)message, e, consumer, acknowledgment)) == null || errorResult instanceof InvocationResult) return;
            InvocationResult result = this.handlerMethod.getInvocationResultFor(errorResult, message.getPayload());
            this.handleResult(Objects.requireNonNullElse(result, errorResult), records, acknowledgment, consumer, (Message<?>)message);
            return;
        }
        catch (Exception ex) {
            throw new ListenerExecutionFailedException(this.createMessagingErrorMessage("Listener error handler threw an exception for the incoming message", message.getPayload()), ex);
        }
    }

    private void setCorrelation(MessageBuilder<?> builder, Message<?> source) {
        byte[] correlationBytes = this.getCorrelation(source);
        if (correlationBytes != null) {
            builder.setHeader(this.correlationHeaderName, (Object)correlationBytes);
        }
    }

    @Nullable
    private byte[] getCorrelation(Message<?> source) {
        return (byte[])source.getHeaders().get((Object)this.correlationHeaderName, byte[].class);
    }

    private void setPartition(MessageBuilder<?> builder, Message<?> source) {
        byte[] partitionBytes = this.getReplyPartition(source);
        if (partitionBytes != null) {
            builder.setHeader("kafka_partitionId", (Object)ByteBuffer.wrap(partitionBytes).getInt());
        }
    }

    private void setKey(MessageBuilder<?> builder, Message<?> source) {
        Object key = source.getHeaders().get((Object)"kafka_receivedMessageKey");
        if (key != null && !(key instanceof List)) {
            builder.setHeader("kafka_messageKey", key);
        }
    }

    @Nullable
    private byte[] getReplyPartition(Message<?> source) {
        return (byte[])source.getHeaders().get((Object)"kafka_replyPartition", byte[].class);
    }

    protected final String createMessagingErrorMessage(String description, Object payload) {
        return description + "\nEndpoint handler details:\nMethod [" + this.handlerMethod.getMethodAsString(payload) + "]\nBean [" + this.handlerMethod.getBean() + "]";
    }

    @Nullable
    protected Type determineInferredType(@Nullable Method method) {
        boolean validParametersForBatch;
        if (method == null) {
            return null;
        }
        Type genericParameterType = null;
        int allowedBatchParameters = 1;
        int notConvertibleParameters = 0;
        for (int i = 0; i < method.getParameterCount(); ++i) {
            MethodParameter methodParameter = new MethodParameter(method, i);
            Type parameterType = methodParameter.getGenericParameterType();
            boolean isNotConvertible = MessagingMessageListenerAdapter.parameterIsType(parameterType, ConsumerRecord.class);
            boolean isAck = MessagingMessageListenerAdapter.parameterIsType(parameterType, Acknowledgment.class);
            this.hasAckParameter |= isAck;
            if (isAck) {
                this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null;
            }
            isNotConvertible |= isAck;
            boolean isConsumer = MessagingMessageListenerAdapter.parameterIsType(parameterType, Consumer.class);
            isNotConvertible |= isConsumer;
            boolean isKotlinContinuation = AdapterUtils.isKotlinContinuation(methodParameter.getParameterType());
            isNotConvertible |= isKotlinContinuation;
            boolean isMeta = MessagingMessageListenerAdapter.parameterIsType(parameterType, ConsumerRecordMetadata.class);
            this.hasMetadataParameter |= isMeta;
            if (isNotConvertible |= isMeta) {
                ++notConvertibleParameters;
            }
            if (!(isNotConvertible || MessagingMessageListenerAdapter.isMessageWithNoTypeInfo(parameterType) || methodParameter.getParameterAnnotations().length != 0 && !methodParameter.hasParameterAnnotation(Payload.class))) {
                if (genericParameterType == null) {
                    genericParameterType = this.extractGenericParameterTypFromMethodParameter(methodParameter);
                    continue;
                }
                this.logger.debug(() -> "Ambiguous parameters for target payload for method " + method + "; no inferred type available");
                break;
            }
            if (!isAck && !isKotlinContinuation && !isConsumer && !MessagingMessageListenerAdapter.annotationHeaderIsGroupId(methodParameter)) continue;
            ++allowedBatchParameters;
        }
        if (notConvertibleParameters == method.getParameterCount() && method.getReturnType().equals(Void.TYPE)) {
            this.conversionNeeded = false;
        }
        boolean bl = validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters;
        if (!validParametersForBatch) {
            String stateMessage = "A parameter of type '%s' must be the only parameter (except for an optional 'Acknowledgment' and/or 'Consumer' and/or '@Header(KafkaHeaders.GROUP_ID) String groupId'";
            Assert.state((!this.isConsumerRecords ? 1 : 0) != 0, () -> String.format(stateMessage, "ConsumerRecords"));
            Assert.state((!this.isConsumerRecordList ? 1 : 0) != 0, () -> String.format(stateMessage, "List<ConsumerRecord>"));
            Assert.state((!this.isMessageList ? 1 : 0) != 0, () -> String.format(stateMessage, "List<Message<?>>"));
        }
        this.messageReturnType = KafkaUtils.returnTypeMessageOrCollectionOf(method);
        return genericParameterType;
    }

    private Type extractGenericParameterTypFromMethodParameter(MethodParameter methodParameter) {
        Type genericParameterType = methodParameter.getGenericParameterType();
        if (genericParameterType instanceof ParameterizedType) {
            ParameterizedType parameterizedType = (ParameterizedType)genericParameterType;
            Type rawType = parameterizedType.getRawType();
            if (rawType.equals(Message.class)) {
                genericParameterType = parameterizedType.getActualTypeArguments()[0];
            } else if (rawType.equals(List.class) && parameterizedType.getActualTypeArguments().length == 1) {
                ParameterizedType pType;
                Type paramType = parameterizedType.getActualTypeArguments()[0];
                boolean messageHasGeneric = paramType instanceof ParameterizedType && (pType = (ParameterizedType)paramType).getRawType().equals(Message.class);
                this.isMessageList = TypeUtils.isAssignable((Type)paramType, Message.class) || messageHasGeneric;
                this.isConsumerRecordList = TypeUtils.isAssignable((Type)paramType, ConsumerRecord.class);
                if (messageHasGeneric) {
                    genericParameterType = ((ParameterizedType)paramType).getActualTypeArguments()[0];
                }
            } else {
                this.isConsumerRecords = rawType.equals(ConsumerRecords.class);
            }
        }
        return genericParameterType;
    }

    private static boolean annotationHeaderIsGroupId(MethodParameter methodParameter) {
        Header header = (Header)methodParameter.getParameterAnnotation(Header.class);
        return header != null && "kafka_groupId".equals(header.value());
    }

    private static boolean isMessageWithNoTypeInfo(Type parameterType) {
        ParameterizedType pType;
        if (parameterType instanceof ParameterizedType && (pType = (ParameterizedType)parameterType).getRawType().equals(Message.class)) {
            return pType.getActualTypeArguments()[0] instanceof WildcardType;
        }
        return Message.class.equals((Object)parameterType);
    }

    private static boolean parameterIsType(Type parameterType, Type type) {
        return parameterType.equals(type) || MessagingMessageListenerAdapter.rawByParameterIsType(parameterType, type);
    }

    private static boolean rawByParameterIsType(Type parameterType, Type type) {
        ParameterizedType pType;
        return parameterType instanceof ParameterizedType && (pType = (ParameterizedType)parameterType).getRawType().equals(type);
    }

    public record ReplyExpressionRoot(Object request, Object source, Object result) {
    }

    static class NoOpAck
    implements Acknowledgment {
        NoOpAck() {
        }

        @Override
        public void acknowledge() {
        }
    }
}

