/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.pulsar.intercept;

import io.micronaut.aop.InterceptorBean;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.context.BeanContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.ArgumentValue;
import io.micronaut.core.type.MutableArgumentValue;
import io.micronaut.core.type.ReturnType;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.pulsar.PulsarProducerRegistry;
import io.micronaut.pulsar.annotation.MessageKey;
import io.micronaut.pulsar.annotation.MessageProperties;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import io.micronaut.pulsar.events.ProducerSubscriptionFailedEvent;
import io.micronaut.pulsar.processor.DefaultSchemaHandler;
import jakarta.annotation.PreDestroy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.schema.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterceptorBean(value={PulsarProducerClient.class})
public class PulsarProducerAdvice
implements MethodInterceptor<Object, Object>,
AutoCloseable,
PulsarProducerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarProducerAdvice.class);
    protected final Map<String, Producer<?>> producers = new ConcurrentHashMap();
    protected final PulsarClient pulsarClient;
    protected final DefaultSchemaHandler simpleSchemaResolver;
    protected final BeanContext beanContext;
    protected final ApplicationEventPublisher<ProducerSubscriptionFailedEvent> applicationEventPublisher;
    protected final ConversionService conversionService;

    public PulsarProducerAdvice(PulsarClient pulsarClient, DefaultSchemaHandler simpleSchemaResolver, BeanContext beanContext, ApplicationEventPublisher<ProducerSubscriptionFailedEvent> applicationEventPublisher, ConversionService conversionService) {
        this.pulsarClient = pulsarClient;
        this.simpleSchemaResolver = simpleSchemaResolver;
        this.beanContext = beanContext;
        this.applicationEventPublisher = applicationEventPublisher;
        this.conversionService = conversionService;
    }

    public Object intercept(MethodInvocationContext<Object, Object> context) {
        if (!context.hasAnnotation(PulsarProducer.class)) {
            return context.proceed();
        }
        AnnotationValue annotationValue = (AnnotationValue)context.findAnnotation(PulsarProducer.class).orElseThrow(() -> new IllegalStateException("No @PulsarProducer on method: " + context));
        boolean sendBefore = annotationValue.booleanValue("sendBefore").orElse(false);
        boolean isAbstract = context.isAbstract();
        Object returnValue = !isAbstract && !sendBefore ? context.proceed() : null;
        Object value = PulsarProducerAdvice.getValueFromContext(context);
        Object key = PulsarProducerAdvice.getKeyFromContext(context);
        Map<String, String> headers = PulsarProducerAdvice.collectHeaders(context);
        ExecutableMethod method = context.getExecutableMethod();
        Producer<?> producer = this.getOrCreateProducer(method, (AnnotationValue<PulsarProducer>)annotationValue);
        ReturnType returnType = method.getReturnType();
        if (returnType.isAsyncOrReactive()) {
            Object abstractValue = this.sendAsync(value, producer, returnType, key, headers);
            if (isAbstract) {
                return abstractValue;
            }
            if (!sendBefore) {
                return returnValue;
            }
            return context.proceed();
        }
        try {
            if (!isAbstract) {
                PulsarProducerAdvice.sendBlocking(value, producer, ReturnType.of(Void.TYPE, (Argument[])new Argument[0]), key, headers);
                return returnValue;
            }
            return PulsarProducerAdvice.sendBlocking(value, producer, returnType, key, headers);
        }
        catch (PulsarClientException e) {
            String producerId = producer.getProducerName();
            LOG.error("Failed to produce message on producer {}", (Object)producerId, (Object)e);
            throw new RuntimeException("Failed to produce a message on " + producerId, e);
        }
    }

    @NonNull
    private static Object getValueFromContext(MethodInvocationContext<Object, Object> context) {
        if (context.getParameters().size() == 1) {
            return context.getParameterValues()[0];
        }
        return context.getParameters().values().stream().filter(mutableArgumentValue -> mutableArgumentValue.isAnnotationPresent(MessageBody.class)).map(ArgumentValue::getValue).findFirst().orElseThrow(() -> new IllegalArgumentException("Producers with multiple values must have one argument annotated with @MessageBody"));
    }

    @Nullable
    private static Object getKeyFromContext(MethodInvocationContext<Object, Object> context) {
        if (context.getParameters().size() == 1) {
            return null;
        }
        return context.getParameters().values().stream().filter(mutableArgumentValue -> mutableArgumentValue.isAnnotationPresent(MessageKey.class)).map(ArgumentValue::getValue).findFirst().orElse(null);
    }

    private <T, V> Object sendAsync(V value, Producer<T> producer, ReturnType<?> returnType, @Nullable Object key, Map<String, String> headers) {
        CompletableFuture future = PulsarProducerAdvice.buildMessage(producer, value, key, headers).sendAsync();
        if (CompletableFuture.class == returnType.getType()) {
            return future;
        }
        return Publishers.convertPublisher((ConversionService)this.conversionService, (Object)future, (Class)returnType.getType());
    }

    private static <T, V> Object sendBlocking(V value, Producer<T> producer, ReturnType<?> returnType, @Nullable Object key, Map<String, String> headers) throws PulsarClientException {
        MessageId sent = PulsarProducerAdvice.buildMessage(producer, value, key, headers).send();
        if (returnType.isVoid()) {
            return Void.TYPE;
        }
        if (returnType.getType() == MessageId.class) {
            return sent;
        }
        if (returnType.getType() == value.getClass()) {
            return value;
        }
        throw new IllegalArgumentException("Pulsar abstract producers can only return MessageId or body being sent.");
    }

    private static <T, V> TypedMessageBuilder<?> buildMessage(Producer<T> producer, V value, @Nullable Object key, Map<String, String> headers) {
        TypedMessageBuilder message = producer.newMessage();
        if (null == key) {
            message.value(value);
        } else {
            message.value((Object)new KeyValue(key, value));
        }
        if (!headers.isEmpty()) {
            message.properties(headers);
        }
        return message;
    }

    private static Map<String, String> collectHeaders(MethodInvocationContext<Object, Object> context) {
        if (context.getParameters().size() == 1) {
            return Collections.emptyMap();
        }
        List<MutableArgumentValue> headers = context.getParameters().values().stream().filter(x -> x.isAnnotationPresent(MessageProperties.class) || x.isAnnotationPresent(MessageHeader.class)).toList();
        if (headers.size() == 1 && headers.get(0).isAnnotationPresent(MessageProperties.class)) {
            return (Map)headers.get(0).getValue();
        }
        return headers.stream().collect(Collectors.toMap(x -> Objects.requireNonNull(x.getAnnotation(MessageHeader.class)).stringValue().orElse(x.getName()), x -> (String)x.getValue()));
    }

    protected Producer<?> getOrCreateProducer(ExecutableMethod<?, ?> method, AnnotationValue<PulsarProducer> annotationValue) {
        String producerId = annotationValue.stringValue("producerName").orElse(method.getMethodName());
        Producer producer = this.producers.get(producerId);
        if (null == producer) {
            try {
                producer = (Producer)this.beanContext.createBean(Producer.class, new Object[]{this.pulsarClient, annotationValue, method.getArguments(), this.simpleSchemaResolver, method.getDescription(true)});
                this.producers.put(producerId, producer);
            }
            catch (Exception ex) {
                if (MessageListenerException.class == ex.getClass() && ex.getMessage().startsWith("Topic")) {
                    LOG.error("Topic missing for producer {} {}", (Object)producerId, (Object)method.getDescription(false));
                } else {
                    LOG.error("Failed to create producer {} with reason: ", (Object)producerId, (Object)ex);
                }
                this.applicationEventPublisher.publishEventAsync((Object)new ProducerSubscriptionFailedEvent(producerId, ex));
            }
        }
        return producer;
    }

    @Override
    @PreDestroy
    public void close() {
        for (Producer<?> producer : this.producers.values()) {
            if (!producer.isConnected()) continue;
            try {
                producer.flush();
                producer.close();
            }
            catch (Exception e) {
                LOG.warn("Error shutting down Pulsar producer: {}", (Object)e.getMessage(), (Object)e);
            }
        }
    }

    @Override
    public Map<String, Producer<?>> getProducers() {
        return this.producers;
    }

    @Override
    public Producer<?> getProducer(@NonNull String id) {
        return this.producers.get(id);
    }

    @Override
    public Set<String> getProducerIds() {
        return this.producers.keySet();
    }
}

