/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.gcp.pubsub.intercept;

import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.ProjectSubscriptionName;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.gcp.GoogleCloudConfiguration;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
import io.micronaut.gcp.pubsub.bind.PubSubBinderRegistry;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler;
import io.micronaut.gcp.pubsub.intercept.AbstractPubSubConsumerMethodProcessor;
import io.micronaut.gcp.pubsub.push.PubSubPushMessageReceiverException;
import io.micronaut.gcp.pubsub.push.PushControllerConfiguration;
import io.micronaut.gcp.pubsub.push.PushSubscriberHandler;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.scheduling.executor.ExecutorSelector;
import jakarta.inject.Singleton;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Requires(beans={PushControllerConfiguration.class})
@Singleton
@Internal
final class PubSubPushConsumerAdvice
extends AbstractPubSubConsumerMethodProcessor<PushSubscription> {
    private static final String EXECUTE_ON = ExecuteOn.class.getName();
    private final PushSubscriberHandler subscriberHandler;
    private final PushControllerConfiguration pubSubConfigurationProperties;
    private final ExecutorSelector executorSelector;

    protected PubSubPushConsumerAdvice(BeanContext beanContext, ConversionService conversionService, GoogleCloudConfiguration googleCloudConfiguration, PubSubBinderRegistry binderRegistry, PubSubMessageReceiverExceptionHandler exceptionHandler, ExecutorSelector executorSelector, PushSubscriberHandler subscriberHandler, PushControllerConfiguration pubSubConfigurationProperties) {
        super(PushSubscription.class, beanContext, conversionService, googleCloudConfiguration, binderRegistry, exceptionHandler);
        this.executorSelector = executorSelector;
        this.subscriberHandler = subscriberHandler;
        this.pubSubConfigurationProperties = pubSubConfigurationProperties;
    }

    @Override
    protected void addSubscriber(ProjectSubscriptionName projectSubscriptionName, MessageReceiver receiver, String configuration) {
        this.subscriberHandler.addSubscriber(projectSubscriptionName, receiver);
    }

    @Override
    protected void handleException(PubSubMessageReceiverException ex) {
        super.handleException(PubSubPushMessageReceiverException.from(ex));
    }

    @Override
    protected Flux<Object> executeSubscriberMethod(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method, BoundExecutable<Object, Object> executable, Object bean) {
        Scheduler subscribeOnScheduler = this.schedulerFor(beanDefinition, method);
        if (subscribeOnScheduler != null) {
            return Mono.fromCallable(() -> Objects.requireNonNull(executable).invoke(bean)).flux().subscribeOn(subscribeOnScheduler);
        }
        return super.executeSubscriberMethod(beanDefinition, method, executable, bean);
    }

    @Nullable
    private Scheduler schedulerFor(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        if (beanDefinition.hasDeclaredAnnotation(ExecuteOn.class)) {
            return this.executorSelector.select((String)beanDefinition.stringValue(EXECUTE_ON).orElse(null)).map(Schedulers::fromExecutorService).orElse(null);
        }
        return this.executorSelector.select(method, null).map(Schedulers::fromExecutorService).orElse(null);
    }
}

