/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.pulsar.processor;

import io.micronaut.context.BeanContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.ArrayUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.pulsar.PulsarConsumerRegistry;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import io.micronaut.pulsar.config.DefaultPulsarClientConfiguration;
import io.micronaut.pulsar.events.ConsumerSubscribedEvent;
import io.micronaut.pulsar.events.ConsumerSubscriptionFailedEvent;
import io.micronaut.pulsar.processor.DefaultListener;
import io.micronaut.pulsar.processor.DefaultSchemaHandler;
import io.micronaut.pulsar.processor.PulsarArgumentHandler;
import io.micronaut.pulsar.processor.TopicResolver;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Internal
public class PulsarConsumerProcessor
implements ExecutableMethodProcessor<PulsarConsumer>,
AutoCloseable,
PulsarConsumerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerProcessor.class);
    protected final TopicResolver topicResolver;
    protected final DefaultPulsarClientConfiguration pulsarClientConfiguration;
    private final ApplicationEventPublisher<Object> applicationEventPublisher;
    private final BeanContext beanContext;
    private final PulsarClient pulsarClient;
    private final DefaultSchemaHandler simpleSchemaResolver;
    private final Map<String, Consumer<?>> consumers = new ConcurrentHashMap();
    private final Map<String, Consumer<?>> paused = new ConcurrentHashMap();
    private final AtomicInteger consumerCounter = new AtomicInteger(10);

    public PulsarConsumerProcessor(ApplicationEventPublisher<Object> applicationEventPublisher, BeanContext beanContext, PulsarClient pulsarClient, DefaultSchemaHandler simpleSchemaResolver, DefaultPulsarClientConfiguration pulsarClientConfiguration, TopicResolver topicResolver) {
        this.applicationEventPublisher = applicationEventPublisher;
        this.beanContext = beanContext;
        this.pulsarClient = pulsarClient;
        this.simpleSchemaResolver = simpleSchemaResolver;
        this.pulsarClientConfiguration = pulsarClientConfiguration;
        this.topicResolver = topicResolver;
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        TopicResolver.TopicResolved topicResolved;
        AnnotationValue consumerAnnotation = method.getDeclaredAnnotation(PulsarConsumer.class);
        if (null == consumerAnnotation) {
            return;
        }
        String name = this.getConsumerName((AnnotationValue<PulsarConsumer>)consumerAnnotation);
        String consumerId = this.topicResolver.generateIdFromMessagingClientName(name, topicResolved = TopicResolver.extractTopic(consumerAnnotation, name));
        if (this.consumers.containsKey(consumerId)) {
            throw new MessageListenerException(String.format("Consumer %s already exists", consumerId));
        }
        AnnotationValue subscriptionAnnotation = method.getAnnotation(PulsarSubscription.class);
        Object[] arguments = method.getArguments();
        if (ArrayUtils.isEmpty((Object[])arguments)) {
            throw new MessageListenerException("Method annotated with PulsarConsumer must accept at least 1 parameter");
        }
        ExecutableMethod<?, ?> castMethod = method;
        Object bean = this.beanContext.getBean(beanDefinition.getBeanType());
        ConsumerBuilder<?> consumerBuilder = this.processConsumerAnnotation((AnnotationValue<PulsarConsumer>)consumerAnnotation, (AnnotationValue<PulsarSubscription>)subscriptionAnnotation, castMethod, bean, topicResolved);
        boolean subscribeAsync = (Boolean)consumerAnnotation.getRequiredValue("subscribeAsync", Boolean.class);
        consumerBuilder.consumerName(name);
        if (subscribeAsync) {
            consumerBuilder.subscribeAsync().handle((consumer, ex) -> {
                if (null != ex) {
                    LOG.error("Failed subscribing Pulsar consumer {} {}", new Object[]{method.getDescription(false), consumerId, ex});
                    this.applicationEventPublisher.publishEventAsync((Object)new ConsumerSubscriptionFailedEvent((Throwable)ex, consumerId));
                    return new MessageListenerException("Failed to subscribe", ex);
                }
                this.consumers.put(consumerId, (Consumer<?>)consumer);
                this.applicationEventPublisher.publishEventAsync((Object)new ConsumerSubscribedEvent((Consumer<?>)consumer));
                return consumer;
            });
        } else {
            try {
                Consumer consumer2 = consumerBuilder.subscribe();
                this.consumers.put(consumerId, consumer2);
                this.applicationEventPublisher.publishEvent((Object)new ConsumerSubscribedEvent(consumer2));
            }
            catch (Exception e) {
                LOG.error("Failed subscribing Pulsar consumer {} {}", new Object[]{method.getDescription(false), consumerId, e});
                this.applicationEventPublisher.publishEvent((Object)new ConsumerSubscriptionFailedEvent(e, consumerId));
                throw new MessageListenerException("Failed to subscribe %s".formatted(consumerId), (Throwable)e);
            }
        }
    }

    @NotNull
    protected String getConsumerName(AnnotationValue<PulsarConsumer> topic) {
        return (String)((Object)topic.stringValue("consumerName").orElse("pulsar-consumer-" + this.consumerCounter.getAndIncrement()));
    }

    private ConsumerBuilder<?> processConsumerAnnotation(AnnotationValue<PulsarConsumer> consumerAnnotation, AnnotationValue<PulsarSubscription> subscription, ExecutableMethod<Object, ?> method, Object bean, TopicResolver.TopicResolved topic) {
        PulsarArgumentHandler argHandler = new PulsarArgumentHandler(method.getArguments(), method.getDescription(false));
        Schema<?> schema = this.simpleSchemaResolver.decideSchema(argHandler.getBodyArgument(), argHandler.getKeyArgument(), consumerAnnotation, method.getDescription(false));
        ConsumerBuilderImpl consumer = new ConsumerBuilderImpl((PulsarClientImpl)this.pulsarClient, schema);
        consumerAnnotation.stringValue("consumerName").ifPresent(arg_0 -> ((ConsumerBuilderImpl)consumer).consumerName(arg_0));
        this.resolveTopic(consumerAnnotation, (ConsumerBuilder<?>)consumer, topic);
        this.resolveDeadLetter(consumerAnnotation, (ConsumerBuilder<?>)consumer);
        if (null != subscription) {
            this.subscriptionValues(subscription, (ConsumerBuilder<?>)consumer);
        } else {
            this.consumerValues(consumerAnnotation, (ConsumerBuilder<?>)consumer);
        }
        consumerAnnotation.stringValue("ackTimeout").map(Duration::parse).ifPresent(duration -> {
            long millis = duration.toMillis();
            if (1000L >= millis) {
                throw new MessageListenerException("Acknowledge timeout must be greater than 1 second");
            }
            consumer.ackTimeout(millis, TimeUnit.MILLISECONDS);
        });
        consumer.messageListener((MessageListener)new DefaultListener(method, argHandler.isMessageWrapper(), bean, argHandler));
        return consumer;
    }

    private void resolveDeadLetter(AnnotationValue<PulsarConsumer> consumerAnnotation, ConsumerBuilder<?> consumerBuilder) {
        int maxRedeliverCount;
        if (!this.pulsarClientConfiguration.getUseDeadLetterQueue().booleanValue()) {
            return;
        }
        DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
        Optional deadLetterTopic = consumerAnnotation.stringValue("deadLetterTopic");
        if (deadLetterTopic.isPresent()) {
            String topic = this.topicResolver.resolve((String)deadLetterTopic.get());
            builder.deadLetterTopic(topic);
        }
        if (0 < (maxRedeliverCount = consumerAnnotation.intValue("maxRetriesBeforeDlq").orElse(this.pulsarClientConfiguration.getDefaultMaxRetryDlq()))) {
            builder.maxRedeliverCount(maxRedeliverCount);
            consumerBuilder.deadLetterPolicy(builder.build());
        }
    }

    private void resolveTopic(AnnotationValue<PulsarConsumer> consumerAnnotation, ConsumerBuilder<?> consumer, TopicResolver.TopicResolved topic) {
        if (topic.isPattern()) {
            this.resolveTopicsPattern(consumerAnnotation, consumer, this.topicResolver.resolve(topic.getTopic()));
        } else if (topic.isArray()) {
            consumer.topic((String[])Arrays.stream(topic.getTopics()).map(this.topicResolver::resolve).toArray(String[]::new));
        } else {
            consumer.topic(new String[]{this.topicResolver.resolve(topic.getTopic())});
        }
    }

    private void resolveTopicsPattern(AnnotationValue<PulsarConsumer> consumerAnnotation, ConsumerBuilder<?> consumer, String topicsPattern) {
        consumer.topicsPattern(topicsPattern);
        RegexSubscriptionMode mode = (RegexSubscriptionMode)consumerAnnotation.getRequiredValue("subscriptionTopicsMode", RegexSubscriptionMode.class);
        consumer.subscriptionTopicsMode(mode);
        OptionalInt topicsRefresh = consumerAnnotation.intValue("patternAutoDiscoveryPeriod");
        if (topicsRefresh.isPresent()) {
            if (topicsRefresh.getAsInt() < 1) {
                throw new MessageListenerException("Topic " + topicsPattern + " refresh time cannot be below 1 second.");
            }
            consumer.patternAutoDiscoveryPeriod(topicsRefresh.getAsInt(), TimeUnit.SECONDS);
        }
    }

    private void consumerValues(AnnotationValue<PulsarConsumer> consumerAnnotation, ConsumerBuilder<?> consumer) {
        String subscriptionName = consumerAnnotation.stringValue("subscription").orElseGet(() -> "pulsar-subscription-" + this.consumerCounter.incrementAndGet());
        SubscriptionType subscriptionType = (SubscriptionType)consumerAnnotation.getRequiredValue("subscriptionType", SubscriptionType.class);
        consumer.subscriptionName(subscriptionName).subscriptionType(subscriptionType);
    }

    private void subscriptionValues(AnnotationValue<PulsarSubscription> subscription, ConsumerBuilder<?> consumer) {
        String subscriptionName = (String)((Object)subscription.stringValue("subscriptionName").orElse("pulsar-subscription-" + this.consumerCounter.incrementAndGet()));
        consumer.subscriptionName(subscriptionName);
        subscription.enumValue("subscriptionType", SubscriptionType.class).ifPresent(arg_0 -> consumer.subscriptionType(arg_0));
        Optional ackGroupTimeout = subscription.stringValue("ackGroupTimeout");
        if (ackGroupTimeout.isPresent()) {
            Duration duration = Duration.parse((CharSequence)ackGroupTimeout.get());
            consumer.acknowledgmentGroupTime(duration.toNanos(), TimeUnit.NANOSECONDS);
        }
    }

    @Override
    public void close() {
        for (Consumer<?> consumer : this.getConsumers().values()) {
            try {
                consumer.unsubscribe();
                consumer.close();
            }
            catch (Exception e) {
                LOG.warn("Error shutting down Pulsar consumer: {}", (Object)e.getMessage(), (Object)e);
            }
        }
    }

    @Override
    public Map<String, Consumer<?>> getConsumers() {
        return Collections.unmodifiableMap(this.consumers);
    }

    @Override
    @NonNull
    public <T> Consumer<T> getConsumer(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Consumer<?> consumer = this.consumers.get(id);
        if (consumer == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        return consumer;
    }

    @Override
    public boolean consumerExists(@NonNull String id) {
        return this.consumers.containsKey(id);
    }

    @Override
    @NonNull
    public Set<String> getConsumerIds() {
        return Collections.unmodifiableSet(this.consumers.keySet());
    }

    @Override
    public boolean isPaused(@NonNull String id) {
        if (StringUtils.isEmpty((CharSequence)id) || !this.consumers.containsKey(id)) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        return this.paused.containsKey(id);
    }

    @Override
    public void pause(@NonNull String id) {
        if (StringUtils.isEmpty((CharSequence)id) || !this.consumers.containsKey(id)) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        Consumer<?> consumer = this.consumers.get(id);
        consumer.pause();
        this.paused.put(id, consumer);
    }

    @Override
    public void resume(@NonNull String id) {
        if (StringUtils.isEmpty((CharSequence)id) || !this.paused.containsKey(id)) {
            throw new IllegalArgumentException("No paused consumer found for ID: " + id);
        }
        Consumer<?> consumer = this.paused.remove(id);
        consumer.resume();
    }
}

