/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.intercept;

import io.micronaut.aop.InterceptedMethod;
import io.micronaut.aop.InterceptorBean;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaPartition;
import io.micronaut.configuration.kafka.annotation.KafkaPartitionKey;
import io.micronaut.configuration.kafka.annotation.KafkaTimestamp;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaProducerConfiguration;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.ReturnType;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.messaging.exceptions.MessagingClientException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@InterceptorBean(value={KafkaClient.class})
public class KafkaClientIntroductionAdvice
implements MethodInterceptor<Object, Object>,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaClientIntroductionAdvice.class);
    private static final ContextSupplier NULL_SUPPLIER = __ -> null;
    private final BeanContext beanContext;
    private final SerdeRegistry serdeRegistry;
    private final ConversionService<?> conversionService;
    private final Map<ProducerKey, ProducerState> producerMap = new ConcurrentHashMap<ProducerKey, ProducerState>();

    public KafkaClientIntroductionAdvice(BeanContext beanContext, SerdeRegistry serdeRegistry, ConversionService<?> conversionService) {
        this.beanContext = beanContext;
        this.serdeRegistry = serdeRegistry;
        this.conversionService = conversionService;
    }

    public final Object intercept(MethodInvocationContext<Object, Object> context) {
        if (context.hasAnnotation(KafkaClient.class)) {
            if (!context.hasAnnotation(KafkaClient.class)) {
                throw new IllegalStateException("No @KafkaClient annotation present on method: " + context);
            }
            ProducerState producerState = this.getProducer(context);
            InterceptedMethod interceptedMethod = InterceptedMethod.of(context);
            try {
                Argument returnType = interceptedMethod.returnTypeValue();
                if (Argument.OBJECT_ARGUMENT.equalsType(returnType)) {
                    returnType = Argument.of(RecordMetadata.class);
                }
                switch (interceptedMethod.resultType()) {
                    case COMPLETION_STAGE: {
                        CompletableFuture<Object> completableFuture = this.returnCompletableFuture(context, producerState, returnType);
                        return interceptedMethod.handleResult(completableFuture);
                    }
                    case PUBLISHER: {
                        Flux<Object> returnFlowable = this.returnPublisher(context, producerState, returnType);
                        return interceptedMethod.handleResult(returnFlowable);
                    }
                    case SYNCHRONOUS: {
                        return this.returnSynchronous(context, producerState);
                    }
                }
                return interceptedMethod.unsupported();
            }
            catch (Exception e) {
                return interceptedMethod.handleException(e);
            }
        }
        return context.proceed();
    }

    private Object returnSynchronous(MethodInvocationContext<Object, Object> context, ProducerState producerState) {
        boolean isReactiveValue;
        ReturnType returnType = context.getReturnType();
        Class javaReturnType = returnType.getType();
        Argument returnTypeArgument = returnType.asArgument();
        Object value = producerState.valueSupplier.get(context);
        boolean bl = isReactiveValue = value != null && Publishers.isConvertibleToPublisher(value.getClass());
        if (isReactiveValue) {
            Flux<Object> sendFlowable = this.buildSendFluxForReactiveValue(context, producerState, returnTypeArgument, value);
            if (Iterable.class.isAssignableFrom(javaReturnType)) {
                return this.conversionService.convert(sendFlowable.collectList().block(), returnTypeArgument).orElse(null);
            }
            if (Void.TYPE.isAssignableFrom(javaReturnType)) {
                Mono maybe = sendFlowable.next();
                return maybe.block();
            }
            return this.conversionService.convert(sendFlowable.blockFirst(), returnTypeArgument).orElse(null);
        }
        boolean transactional = producerState.transactional;
        Producer kafkaProducer = producerState.kafkaProducer;
        try {
            Object returnValue;
            if (transactional) {
                LOG.trace("Beginning transaction for producer: {}", (Object)producerState.transactionalId);
                kafkaProducer.beginTransaction();
            }
            if (producerState.isBatchSend) {
                Iterable<Object> batchValue = value != null && value.getClass().isArray() ? Arrays.asList((Object[])value) : (!(value instanceof Iterable) ? Collections.singletonList(value) : (Iterable)value);
                ArrayList results = new ArrayList();
                for (Object o : batchValue) {
                    ProducerRecord<?, ?> record = this.buildProducerRecord(context, producerState, o);
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("@KafkaClient method [" + KafkaClientIntroductionAdvice.logMethod(context) + "] Sending producer record: " + record);
                    }
                    Object result = producerState.maxBlock != null ? kafkaProducer.send(record).get(producerState.maxBlock.toMillis(), TimeUnit.MILLISECONDS) : kafkaProducer.send(record).get();
                    results.add(result);
                }
                returnValue = this.conversionService.convert(results, returnTypeArgument).orElseGet(() -> {
                    if (javaReturnType == producerState.bodyArgument.getType()) {
                        return value;
                    }
                    return null;
                });
            } else {
                ProducerRecord<?, ?> record = this.buildProducerRecord(context, producerState, value);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("@KafkaClient method [{}] Sending producer record: {}", (Object)KafkaClientIntroductionAdvice.logMethod(context), record);
                }
                Object result = producerState.maxBlock != null ? kafkaProducer.send(record).get(producerState.maxBlock.toMillis(), TimeUnit.MILLISECONDS) : kafkaProducer.send(record).get();
                returnValue = this.conversionService.convert(result, returnTypeArgument).orElseGet(() -> {
                    if (javaReturnType == producerState.bodyArgument.getType()) {
                        return value;
                    }
                    return null;
                });
            }
            if (transactional) {
                LOG.trace("Committing transaction for producer: {}", (Object)producerState.transactionalId);
                kafkaProducer.commitTransaction();
            }
            return returnValue;
        }
        catch (Exception e) {
            if (transactional) {
                LOG.trace("Aborting transaction for producer: {}", (Object)producerState.transactionalId);
                kafkaProducer.abortTransaction();
            }
            throw this.wrapException(context, e);
        }
    }

    private Flux<Object> returnPublisher(MethodInvocationContext<Object, Object> context, ProducerState producerState, Argument<?> returnType) {
        Flux returnFlowable;
        boolean isReactiveValue;
        Object value = producerState.valueSupplier.get(context);
        boolean bl = isReactiveValue = value != null && Publishers.isConvertibleToPublisher(value.getClass());
        if (isReactiveValue) {
            returnFlowable = this.buildSendFluxForReactiveValue(context, producerState, returnType, value);
        } else if (producerState.isBatchSend) {
            Object batchValue = value != null && value.getClass().isArray() ? Arrays.asList((Object[])value) : value;
            Flux bodyEmitter = batchValue instanceof Iterable ? Flux.fromIterable((Iterable)((Iterable)batchValue)) : Flux.just(batchValue);
            returnFlowable = bodyEmitter.flatMap(o -> this.buildSendFlux(context, producerState, o, returnType));
        } else {
            returnFlowable = this.buildSendFlux(context, producerState, value, returnType);
        }
        return returnFlowable;
    }

    private CompletableFuture<Object> returnCompletableFuture(final MethodInvocationContext<Object, Object> context, ProducerState producerState, Argument<?> returnType) {
        boolean isReactiveValue;
        final CompletableFuture<Object> completableFuture = new CompletableFuture<Object>();
        Object value = producerState.valueSupplier.get(context);
        boolean bl = isReactiveValue = value != null && Publishers.isConvertibleToPublisher(value.getClass());
        if (isReactiveValue) {
            Flux sendFlowable = this.buildSendFluxForReactiveValue(context, producerState, returnType, value);
            if (!Publishers.isSingle(value.getClass())) {
                sendFlowable = sendFlowable.collectList().flux();
            }
            sendFlowable.subscribe(new Subscriber(){
                boolean completed = false;

                public void onSubscribe(Subscription s) {
                    s.request(1L);
                }

                public void onNext(Object o) {
                    completableFuture.complete(o);
                    this.completed = true;
                }

                public void onError(Throwable t) {
                    completableFuture.completeExceptionally((Throwable)KafkaClientIntroductionAdvice.this.wrapException((MethodInvocationContext<Object, Object>)context, t));
                }

                public void onComplete() {
                    if (!this.completed) {
                        completableFuture.complete(null);
                    }
                }
            });
        } else {
            ProducerRecord<?, ?> record = this.buildProducerRecord(context, producerState, value);
            if (LOG.isTraceEnabled()) {
                LOG.trace("@KafkaClient method [" + KafkaClientIntroductionAdvice.logMethod(context) + "] Sending producer record: " + record);
            }
            boolean transactional = producerState.transactional;
            Producer kafkaProducer = producerState.kafkaProducer;
            try {
                if (transactional) {
                    LOG.trace("Beginning transaction for producer: {}", (Object)producerState.transactionalId);
                    kafkaProducer.beginTransaction();
                }
                kafkaProducer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        completableFuture.completeExceptionally((Throwable)this.wrapException(context, exception));
                    } else if (returnType.equalsType(Argument.VOID_OBJECT)) {
                        completableFuture.complete(null);
                    } else {
                        Optional converted = this.conversionService.convert((Object)metadata, returnType);
                        if (converted.isPresent()) {
                            completableFuture.complete(converted.get());
                        } else if (returnType.getType() == producerState.bodyArgument.getType()) {
                            completableFuture.complete(value);
                        }
                    }
                });
                if (transactional) {
                    LOG.trace("Committing transaction for producer: {}", (Object)producerState.transactionalId);
                    kafkaProducer.commitTransaction();
                }
            }
            catch (Exception e) {
                if (transactional) {
                    LOG.trace("Aborting transaction for producer: {}", (Object)producerState.transactionalId);
                    kafkaProducer.abortTransaction();
                }
                throw e;
            }
        }
        return completableFuture;
    }

    private Mono<RecordMetadata> producerSend(Producer<?, ?> producer, ProducerRecord record) {
        return Mono.create(emitter -> producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                emitter.error((Throwable)exception);
            } else {
                emitter.success((Object)metadata);
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @PreDestroy
    public final void close() {
        try {
            for (ProducerState producerState : this.producerMap.values()) {
                try {
                    producerState.kafkaProducer.close();
                }
                catch (Exception e) {
                    LOG.warn("Error closing Kafka producer: {}", (Object)e.getMessage(), (Object)e);
                }
            }
        }
        finally {
            this.producerMap.clear();
        }
    }

    private Flux<Object> buildSendFlux(MethodInvocationContext<Object, Object> context, ProducerState producerState, Object value, Argument<?> returnType) {
        ProducerRecord<?, ?> record = this.buildProducerRecord(context, producerState, value);
        return Flux.defer(() -> {
            boolean transactional = producerState.transactional;
            Producer kafkaProducer = producerState.kafkaProducer;
            if (transactional) {
                LOG.trace("Committing transaction for producer: {}", (Object)producerState.transactionalId);
                kafkaProducer.beginTransaction();
            }
            Mono result = this.producerSend(kafkaProducer, record).map(metadata -> this.convertResult((RecordMetadata)metadata, returnType, value, (Argument<?>)producerState.bodyArgument)).onErrorMap(e -> this.wrapException(context, (Throwable)e));
            if (transactional) {
                return this.addTransactionalProcessing(producerState, (Flux<Object>)result.flux());
            }
            return result;
        });
    }

    private Flux<Object> buildSendFluxForReactiveValue(MethodInvocationContext<Object, Object> context, ProducerState producerState, Argument<?> returnType, Object value) {
        Flux valueFlowable = Flux.from((Publisher)((Publisher)Publishers.convertPublisher((Object)value, Publisher.class)));
        Class javaReturnType = returnType.getType();
        if (Iterable.class.isAssignableFrom(javaReturnType)) {
            returnType = returnType.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT);
        }
        boolean transactional = producerState.transactional;
        Producer kafkaProducer = producerState.kafkaProducer;
        if (transactional) {
            LOG.trace("Beginning transaction for producer: {}", (Object)producerState.transactionalId);
            kafkaProducer.beginTransaction();
        }
        Argument finalReturnType = returnType;
        Flux sendFlowable = valueFlowable.flatMap(o -> {
            ProducerRecord<?, ?> record = this.buildProducerRecord(context, producerState, o);
            if (LOG.isTraceEnabled()) {
                LOG.trace("@KafkaClient method [{}] Sending producer record: {}", (Object)KafkaClientIntroductionAdvice.logMethod(context), record);
            }
            return this.producerSend(kafkaProducer, record).map(metadata -> this.convertResult((RecordMetadata)metadata, (Argument<?>)finalReturnType, o, (Argument<?>)producerState.bodyArgument)).onErrorMap(e -> this.wrapException(context, (Throwable)e));
        });
        if (transactional) {
            sendFlowable = this.addTransactionalProcessing(producerState, (Flux<Object>)sendFlowable);
        }
        if (producerState.maxBlock != null) {
            sendFlowable = sendFlowable.timeout(producerState.maxBlock);
        }
        return sendFlowable;
    }

    private Flux<Object> addTransactionalProcessing(ProducerState producerState, Flux<Object> sendFlowable) {
        return sendFlowable.doOnError(throwable -> {
            LOG.trace("Aborting transaction for producer: {}", (Object)producerState.transactionalId);
            producerState.kafkaProducer.abortTransaction();
        }).doOnComplete(() -> {
            LOG.trace("Committing transaction for producer: {}", (Object)producerState.transactionalId);
            producerState.kafkaProducer.commitTransaction();
        });
    }

    private Object convertResult(RecordMetadata metadata, Argument<?> returnType, Object value, Argument<?> valueArgument) {
        if (returnType.isVoid()) {
            return metadata;
        }
        if (RecordMetadata.class.isAssignableFrom(returnType.getType())) {
            return metadata;
        }
        if (returnType.getType() == valueArgument.getType()) {
            return value;
        }
        return this.conversionService.convertRequired((Object)metadata, returnType);
    }

    private MessagingClientException wrapException(MethodInvocationContext<Object, Object> context, Throwable exception) {
        return new MessagingClientException("Exception sending producer record for method [" + context + "]: " + exception.getMessage(), exception);
    }

    private ProducerRecord<?, ?> buildProducerRecord(MethodInvocationContext<Object, Object> context, ProducerState producerState, Object value) {
        return new ProducerRecord((String)producerState.topicSupplier.get(context), (Integer)producerState.partitionSupplier.get(context), (Long)producerState.timestampSupplier.get(context), producerState.keySupplier.get(context), value, (Iterable)producerState.headersSupplier.get(context));
    }

    private ProducerState getProducer(MethodInvocationContext<?, ?> context) {
        ProducerKey key = new ProducerKey(context.getTarget(), context.getExecutableMethod());
        return this.producerMap.computeIfAbsent(key, producerKey -> {
            Serializer valueSerializer;
            Object keySerializer;
            Optional namedConfig;
            Argument argument;
            int finalI;
            int i;
            String clientId = context.stringValue(KafkaClient.class).orElse(null);
            LinkedList<ContextSupplier> headersSuppliers = new LinkedList<ContextSupplier>();
            List headers = context.getAnnotationValuesByType(MessageHeader.class);
            if (!headers.isEmpty()) {
                ArrayList<RecordHeader> kafkaHeaders = new ArrayList<RecordHeader>(headers.size());
                for (AnnotationValue header : headers) {
                    String name = header.stringValue("name").orElse(null);
                    String value = header.stringValue().orElse(null);
                    if (!StringUtils.isNotEmpty((CharSequence)name) || !StringUtils.isNotEmpty((CharSequence)value)) continue;
                    kafkaHeaders.add(new RecordHeader(name, value.getBytes(StandardCharsets.UTF_8)));
                }
                if (!kafkaHeaders.isEmpty()) {
                    headersSuppliers.add(ctx -> kafkaHeaders);
                }
            }
            Argument keyArgument = null;
            Argument bodyArgument = null;
            ContextSupplier[] topicSupplier = new ContextSupplier[]{ctx -> ctx.stringValue(Topic.class).filter(StringUtils::isNotEmpty).orElseThrow(() -> new MessagingClientException("No topic specified for method: " + context))};
            ContextSupplier keySupplier = NULL_SUPPLIER;
            ContextSupplier valueSupplier = NULL_SUPPLIER;
            ContextSupplier timestampSupplier = NULL_SUPPLIER;
            BiFunction<MethodInvocationContext, Producer, Integer> partitionFromProducerFn = (ctx, producer) -> null;
            Argument[] arguments = context.getArguments();
            for (i = 0; i < arguments.length; ++i) {
                finalI = i;
                argument = arguments[i];
                if (ProducerRecord.class.isAssignableFrom(argument.getType()) || argument.isAnnotationPresent(MessageBody.class)) {
                    bodyArgument = argument.isAsyncOrReactive() ? argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT) : argument;
                    valueSupplier = ctx -> ctx.getParameterValues()[finalI];
                    continue;
                }
                if (argument.isAnnotationPresent(KafkaKey.class)) {
                    keyArgument = argument;
                    keySupplier = ctx -> ctx.getParameterValues()[finalI];
                    continue;
                }
                if (argument.isAnnotationPresent(Topic.class)) {
                    ContextSupplier prevTopicSupplier = topicSupplier[0];
                    topicSupplier[0] = ctx -> {
                        String topic;
                        Object o = ctx.getParameterValues()[finalI];
                        if (o != null && StringUtils.isNotEmpty((CharSequence)(topic = o.toString()))) {
                            return topic;
                        }
                        return (String)prevTopicSupplier.get((MethodInvocationContext<?, ?>)ctx);
                    };
                    continue;
                }
                if (argument.isAnnotationPresent(KafkaTimestamp.class)) {
                    timestampSupplier = ctx -> {
                        Object o = ctx.getParameterValues()[finalI];
                        if (o instanceof Long) {
                            return (Long)o;
                        }
                        return null;
                    };
                    continue;
                }
                if (argument.isAnnotationPresent(KafkaPartition.class)) {
                    partitionFromProducerFn = (ctx, producer) -> {
                        Object o = ctx.getParameterValues()[finalI];
                        if (o != null && Integer.class.isAssignableFrom(o.getClass())) {
                            return (Integer)o;
                        }
                        return null;
                    };
                    continue;
                }
                if (argument.isAnnotationPresent(KafkaPartitionKey.class)) {
                    partitionFromProducerFn = (ctx, producer) -> {
                        Object partitionKey = ctx.getParameterValues()[finalI];
                        if (partitionKey != null) {
                            ByteArraySerializer serializer = this.serdeRegistry.pickSerializer(argument);
                            if (serializer == null) {
                                serializer = new ByteArraySerializer();
                            }
                            String topic = (String)topicSupplier[0].get((MethodInvocationContext<?, ?>)ctx);
                            byte[] partitionKeyBytes = serializer.serialize(topic, partitionKey);
                            return Utils.toPositive((int)Utils.murmur2((byte[])partitionKeyBytes)) % producer.partitionsFor(topic).size();
                        }
                        return null;
                    };
                    continue;
                }
                if (argument.isAnnotationPresent(MessageHeader.class)) {
                    AnnotationMetadata annotationMetadata = argument.getAnnotationMetadata();
                    String name = annotationMetadata.stringValue(MessageHeader.class, "name").orElseGet(() -> annotationMetadata.stringValue(MessageHeader.class).orElseGet(() -> ((Argument)argument).getName()));
                    headersSuppliers.add(ctx -> {
                        Serializer serializer;
                        Object headerValue = ctx.getParameterValues()[finalI];
                        if (headerValue != null && (serializer = this.serdeRegistry.pickSerializer(argument)) != null) {
                            try {
                                return Collections.singleton(new RecordHeader(name, serializer.serialize(null, headerValue)));
                            }
                            catch (Exception e) {
                                throw new MessagingClientException("Cannot serialize header argument [" + argument + "] for method [" + ctx + "]: " + e.getMessage(), (Throwable)e);
                            }
                        }
                        return Collections.emptySet();
                    });
                    continue;
                }
                if (argument.isContainerType() && Header.class.isAssignableFrom(argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT).getType())) {
                    headersSuppliers.add(ctx -> {
                        Collection parameterHeaders = (Collection)ctx.getParameterValues()[finalI];
                        if (parameterHeaders != null) {
                            return parameterHeaders;
                        }
                        return Collections.emptySet();
                    });
                    continue;
                }
                Class argumentType = argument.getType();
                if (argumentType != Headers.class && argumentType != RecordHeaders.class) continue;
                headersSuppliers.add(ctx -> {
                    Headers parameterHeaders = (Headers)ctx.getParameterValues()[finalI];
                    if (parameterHeaders != null) {
                        return parameterHeaders;
                    }
                    return Collections.emptySet();
                });
            }
            if (bodyArgument == null) {
                for (i = 0; i < arguments.length; ++i) {
                    finalI = i;
                    argument = arguments[i];
                    if (argument.getAnnotationMetadata().hasStereotype(Bindable.class)) continue;
                    bodyArgument = argument.isAsyncOrReactive() ? argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT) : argument;
                    valueSupplier = ctx -> ctx.getParameterValues()[finalI];
                    break;
                }
                if (bodyArgument == null) {
                    throw new MessagingClientException("No valid message body argument found for method: " + context);
                }
            }
            AbstractKafkaProducerConfiguration configuration = clientId != null ? ((namedConfig = this.beanContext.findBean(KafkaProducerConfiguration.class, Qualifiers.byName((String)clientId))).isPresent() ? (AbstractKafkaProducerConfiguration)namedConfig.get() : (AbstractKafkaProducerConfiguration)this.beanContext.getBean(AbstractKafkaProducerConfiguration.class)) : (AbstractKafkaProducerConfiguration)this.beanContext.getBean(AbstractKafkaProducerConfiguration.class);
            DefaultKafkaProducerConfiguration newConfiguration = new DefaultKafkaProducerConfiguration(configuration);
            Properties newProperties = newConfiguration.getConfig();
            String transactionalId = context.stringValue(KafkaClient.class, "transactionalId").filter(StringUtils::isNotEmpty).orElse(null);
            if (clientId != null) {
                newProperties.putIfAbsent("client.id", clientId);
            }
            if (transactionalId != null) {
                newProperties.putIfAbsent("transactional.id", transactionalId);
            }
            context.getValue(KafkaClient.class, "maxBlock", Duration.class).ifPresent(maxBlock -> newProperties.put("max.block.ms", String.valueOf(maxBlock.toMillis())));
            Integer ack = context.intValue(KafkaClient.class, "acks").orElse(Integer.MIN_VALUE);
            if (ack != Integer.MIN_VALUE) {
                String acksValue = ack == -1 ? "all" : String.valueOf(ack);
                newProperties.put("acks", acksValue);
            }
            context.findAnnotation(KafkaClient.class).map(ann -> ann.getProperties("properties", "name")).ifPresent(newProperties::putAll);
            LOG.debug("Creating new KafkaProducer.");
            if (!newProperties.containsKey("key.serializer") && (keySerializer = (Serializer)newConfiguration.getKeySerializer().orElse(null)) == null) {
                keySerializer = keyArgument != null ? this.serdeRegistry.pickSerializer(keyArgument) : new ByteArraySerializer();
                LOG.debug("Using Kafka key serializer: {}", keySerializer);
                newConfiguration.setKeySerializer(keySerializer);
            }
            boolean isBatchSend = context.isTrue(KafkaClient.class, "batch");
            if (!newProperties.containsKey("value.serializer") && (valueSerializer = (Serializer)newConfiguration.getValueSerializer().orElse(null)) == null) {
                valueSerializer = this.serdeRegistry.pickSerializer(isBatchSend ? bodyArgument.getFirstTypeVariable().orElse(bodyArgument) : bodyArgument);
                LOG.debug("Using Kafka value serializer: {}", valueSerializer);
                newConfiguration.setValueSerializer(valueSerializer);
            }
            Producer producer2 = (Producer)this.beanContext.createBean(Producer.class, new Object[]{newConfiguration});
            boolean transactional = StringUtils.isNotEmpty((CharSequence)transactionalId);
            timestampSupplier = context.isTrue(KafkaClient.class, "timestamp") ? ctx -> System.currentTimeMillis() : timestampSupplier;
            Duration maxBlock2 = context.getValue(KafkaClient.class, "maxBlock", Duration.class).orElse(null);
            if (transactional) {
                producer2.initTransactions();
            }
            ContextSupplier headersSupplier = ctx -> {
                if (headersSuppliers.isEmpty()) {
                    return null;
                }
                ArrayList<Header> headerList = new ArrayList<Header>(headersSuppliers.size());
                for (ContextSupplier supplier : headersSuppliers) {
                    for (Header header : (Iterable)supplier.get((MethodInvocationContext<?, ?>)ctx)) {
                        headerList.add(header);
                    }
                }
                if (headerList.isEmpty()) {
                    return null;
                }
                return headerList;
            };
            BiFunction<MethodInvocationContext, Producer, Integer> finalPartitionFromProducerFn = partitionFromProducerFn;
            ContextSupplier partitionSupplier = ctx -> (Integer)finalPartitionFromProducerFn.apply((MethodInvocationContext)ctx, producer2);
            return new ProducerState(producer2, keySupplier, topicSupplier[0], valueSupplier, timestampSupplier, partitionSupplier, headersSupplier, transactional, transactionalId, maxBlock2, isBatchSend, bodyArgument);
        });
    }

    private static String logMethod(ExecutableMethod<?, ?> method) {
        return method.getDeclaringType().getSimpleName() + "#" + method.getName();
    }

    private static final class ProducerKey {
        private final Object target;
        private final ExecutableMethod<?, ?> method;
        private final int hashCode;

        private ProducerKey(Object target, ExecutableMethod<?, ?> method) {
            this.target = target;
            this.method = method;
            this.hashCode = Objects.hash(target, method);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ProducerKey that = (ProducerKey)o;
            return Objects.equals(this.target, that.target) && Objects.equals(this.method, that.method);
        }

        public int hashCode() {
            return this.hashCode;
        }
    }

    private static interface ContextSupplier<T>
    extends Function<MethodInvocationContext<?, ?>, T> {
        default public T get(MethodInvocationContext<?, ?> ctx) {
            return (T)this.apply(ctx);
        }
    }

    private static final class ProducerState {
        private final Producer<?, ?> kafkaProducer;
        private final ContextSupplier<Object> keySupplier;
        private final ContextSupplier<String> topicSupplier;
        private final ContextSupplier<Object> valueSupplier;
        private final ContextSupplier<Long> timestampSupplier;
        private final ContextSupplier<Integer> partitionSupplier;
        private final ContextSupplier<Collection<Header>> headersSupplier;
        private final boolean transactional;
        private final String transactionalId;
        @Nullable
        private final Duration maxBlock;
        private final boolean isBatchSend;
        private final Argument<?> bodyArgument;

        private ProducerState(Producer<?, ?> kafkaProducer, ContextSupplier<Object> keySupplier, ContextSupplier<String> topicSupplier, ContextSupplier<Object> valueSupplier, ContextSupplier<Long> timestampSupplier, ContextSupplier<Integer> partitionSupplier, ContextSupplier<Collection<Header>> headersSupplier, boolean transactional, @Nullable String transactionalId, @Nullable Duration maxBlock, boolean isBatchSend, @Nullable Argument<?> bodyArgument) {
            this.kafkaProducer = kafkaProducer;
            this.keySupplier = keySupplier;
            this.topicSupplier = topicSupplier;
            this.valueSupplier = valueSupplier;
            this.timestampSupplier = timestampSupplier;
            this.partitionSupplier = partitionSupplier;
            this.headersSupplier = headersSupplier;
            this.transactional = transactional;
            this.transactionalId = transactionalId;
            this.maxBlock = maxBlock;
            this.isBatchSend = isBatchSend;
            this.bodyArgument = bodyArgument;
        }
    }
}

