/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.coherence.messaging;

import com.tangosol.net.Coherence;
import com.tangosol.net.Session;
import com.tangosol.net.events.CoherenceLifecycleEvent;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Publisher;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.util.Filter;
import com.tangosol.util.ValueExtractor;
import io.micronaut.coherence.ExtractorFactories;
import io.micronaut.coherence.FilterFactories;
import io.micronaut.coherence.annotation.CoherenceTopicListener;
import io.micronaut.coherence.annotation.CommitStrategy;
import io.micronaut.coherence.annotation.ExtractorBinding;
import io.micronaut.coherence.annotation.FilterBinding;
import io.micronaut.coherence.annotation.SessionName;
import io.micronaut.coherence.annotation.SubscriberGroup;
import io.micronaut.coherence.annotation.Utils;
import io.micronaut.coherence.messaging.SubscriberExceptionHandler;
import io.micronaut.coherence.messaging.binders.ElementArgumentBinderRegistry;
import io.micronaut.coherence.messaging.exceptions.CoherenceSubscriberException;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.Blocking;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArrayUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Singleton
class CoherenceTopicListenerProcessor
implements ExecutableMethodProcessor<CoherenceTopicListener>,
Coherence.LifecycleListener,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(CoherenceTopicListenerProcessor.class);
    private static final Void VOID = null;
    private final ApplicationContext context;
    private final FilterFactories filterFactories;
    private final ExtractorFactories extractorFactories;
    private final List<MethodHolder> methods = new ArrayList<MethodHolder>();
    private final List<TopicSubscriber<?, ?, ?>> subscribers = new ArrayList();
    private final ElementArgumentBinderRegistry registry;
    private final Scheduler scheduler;
    private volatile boolean subscribed;
    private final ConversionService conversionService;

    @Inject
    public CoherenceTopicListenerProcessor(@Named(value="consumer") ExecutorService executorService, ElementArgumentBinderRegistry registry, ApplicationContext context, FilterFactories filterFactories, ExtractorFactories extractorFactories, ConversionService conversionService) {
        this.scheduler = Schedulers.fromExecutor((Executor)executorService);
        this.context = context;
        this.filterFactories = filterFactories;
        this.extractorFactories = extractorFactories;
        this.registry = registry;
        this.conversionService = conversionService;
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        this.methods.add(new MethodHolder(beanDefinition, method));
    }

    public void onEvent(CoherenceLifecycleEvent event) {
        if (event.getType() == CoherenceLifecycleEvent.Type.STARTED) {
            Coherence coherence = event.getCoherence();
            this.createSubscribers(coherence);
        }
    }

    @Override
    @PreDestroy
    public void close() {
        this.subscribers.forEach(TopicSubscriber::close);
        this.subscribers.clear();
    }

    public boolean isSubscribed() {
        return this.subscribed;
    }

    void createSubscribers(Coherence coherence) {
        for (MethodHolder holder : this.methods) {
            List extractorBindings;
            Publisher[] sendToPublishers;
            ArrayList<Object> options = new ArrayList<Object>();
            ExecutableMethod<?, ?> method = holder.getMethod();
            String topicName = Utils.getFirstTopicName(method).orElse(method.getMethodName());
            String sessionName = method.stringValue(SessionName.class).orElse("");
            if (!coherence.hasSession(sessionName)) {
                LOG.info("Skipping @CoherenceTopicListener annotated method subscription {} Session {} does not exist on Coherence instance {}", new Object[]{method, sessionName, coherence.getName()});
                return;
            }
            Session session = coherence.getSession(sessionName);
            String[] sendToTopics = Utils.getSendToTopicNames(method);
            if (sendToTopics.length > 0) {
                if (method.getReturnType().isVoid()) {
                    LOG.info("Skipping @SendTo annotations for @CoherenceTopicListener annotated method {} - method return type is void", method);
                    sendToPublishers = new Publisher[]{};
                } else {
                    sendToPublishers = new Publisher[sendToTopics.length];
                    for (int i = 0; i < sendToTopics.length; ++i) {
                        NamedTopic topic = session.getTopic(sendToTopics[i]);
                        sendToPublishers[i] = topic.createPublisher();
                    }
                }
            } else {
                sendToPublishers = new Publisher[]{};
            }
            method.stringValue(SubscriberGroup.class).ifPresent(name -> options.add(Subscriber.Name.of((String)name)));
            List filterBindings = method.getAnnotationNamesByStereotype(FilterBinding.class);
            if (!filterBindings.isEmpty()) {
                Set<Annotation> annotations = filterBindings.stream().map(s -> method.getAnnotationType(s).orElse(null)).filter(Objects::nonNull).map(arg_0 -> method.synthesize(arg_0)).collect(Collectors.toSet());
                Filter filter = this.filterFactories.resolve(annotations);
                if (filter != null) {
                    options.add(Subscriber.Filtered.by(filter));
                }
            }
            if (!(extractorBindings = method.getAnnotationNamesByStereotype(ExtractorBinding.class)).isEmpty()) {
                Set<Annotation> annotations = extractorBindings.stream().map(s -> method.getAnnotationType(s).orElse(null)).filter(Objects::nonNull).map(arg_0 -> method.synthesize(arg_0)).collect(Collectors.toSet());
                ValueExtractor extractor = this.extractorFactories.resolve(annotations);
                if (extractor != null) {
                    options.add(Subscriber.Convert.using(extractor));
                }
            }
            BeanDefinition<?> beanDefinition = holder.getBeanDefinition();
            Class clsBeanType = beanDefinition.getBeanType();
            Object bean = this.context.getBean(clsBeanType);
            NamedTopic topic = session.getTopic(topicName);
            Subscriber subscriber = topic.createSubscriber(options.toArray(options.toArray(new Subscriber.Option[0])));
            TopicSubscriber topicSubscriber = new TopicSubscriber(topicName, subscriber, sendToPublishers, bean, method, this.registry, this.conversionService, this.scheduler);
            this.subscribers.add(topicSubscriber);
            topicSubscriber.nextMessage();
        }
        this.subscribed = true;
    }

    static class MethodHolder {
        private final BeanDefinition<?> beanDefinition;
        private final ExecutableMethod<?, ?> method;

        public MethodHolder(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
            this.beanDefinition = beanDefinition;
            this.method = method;
        }

        public BeanDefinition<?> getBeanDefinition() {
            return this.beanDefinition;
        }

        public ExecutableMethod<?, ?> getMethod() {
            return this.method;
        }
    }

    static class TopicSubscriber<E, T, R>
    implements AutoCloseable {
        private final String topicName;
        private final Subscriber<E> subscriber;
        private final Publisher<?>[] publishers;
        private final T bean;
        private final ExecutableMethod<?, ?> method;
        private final Optional<Argument<?>> subscriberArg;
        private final ElementArgumentBinderRegistry registry;
        private final Scheduler scheduler;
        private final CommitStrategy commitStrategy;
        private final ConversionService conversionService;

        TopicSubscriber(String topicName, Subscriber<E> subscriber, Publisher<?>[] publishers, T bean, ExecutableMethod<T, R> method, ElementArgumentBinderRegistry registry, ConversionService conversionService, Scheduler scheduler) {
            this.topicName = topicName;
            this.subscriber = subscriber;
            this.publishers = publishers;
            this.bean = bean;
            this.method = method;
            this.registry = registry;
            this.scheduler = scheduler;
            Class<?> cls = subscriber.getClass();
            this.subscriberArg = Arrays.stream(method.getArguments()).filter(arg -> Subscriber.class.isAssignableFrom(arg.getType()) && arg.getType().isAssignableFrom(cls)).findFirst();
            this.commitStrategy = method.getValue(CoherenceTopicListener.class, "commitStrategy", CommitStrategy.class).orElse(CommitStrategy.SYNC);
            this.conversionService = conversionService;
        }

        @Override
        public void close() {
            try {
                this.subscriber.close();
            }
            catch (Exception e) {
                LOG.error("Error closing subscriber for topic {}", (Object)this.topicName, (Object)e);
            }
        }

        private void nextMessage() {
            if (this.subscriber.isActive()) {
                ((CompletableFuture)this.subscriber.receive().handle(this::handleMessage)).handle((v, err) -> {
                    if (err != null) {
                        LOG.error("Error requesting message from topic {} for method {} - subscriber will be closed", new Object[]{this.topicName, this.method, err});
                        this.subscriber.close();
                    }
                    return VOID;
                });
            }
        }

        private Void handleMessage(Subscriber.Element<E> element, Throwable throwable) {
            SubscriberExceptionHandler.Action action;
            block15: {
                action = SubscriberExceptionHandler.Action.Continue;
                Throwable error = null;
                if (throwable == null) {
                    try {
                        HashMap mapBindings = new HashMap();
                        this.subscriberArg.ifPresent(arg -> mapBindings.put(arg, this.subscriber));
                        DefaultExecutableBinder batchBinder = new DefaultExecutableBinder(mapBindings);
                        BoundExecutable boundExecutable = batchBinder.bind(this.method, (ArgumentBinderRegistry)this.registry, element);
                        Object result2 = boundExecutable.invoke(this.bean);
                        this.handleResult(result2);
                    }
                    catch (Throwable thrown) {
                        error = thrown;
                    }
                } else {
                    error = throwable;
                }
                if (error == null) {
                    try {
                        if (this.commitStrategy == CommitStrategy.MANUAL) break block15;
                        CompletableFuture future = element.commitAsync();
                        if (this.commitStrategy == CommitStrategy.ASYNC) {
                            future.handle((result, commitError) -> {
                                if (commitError != null) {
                                    LOG.error("Error committing element channel={} position={}", new Object[]{element.getChannel(), element.getPosition(), commitError});
                                } else if (!result.isSuccess()) {
                                    LOG.error("Failed to commit element channel={} position={} status {}", new Object[]{element.getChannel(), element.getPosition(), result});
                                }
                                return VOID;
                            });
                        } else {
                            Subscriber.CommitResult result3 = (Subscriber.CommitResult)future.join();
                            if (!result3.isSuccess()) {
                                LOG.error("Failed to commit element channel={} position={} status {}", new Object[]{element.getChannel(), element.getPosition(), result3});
                            }
                        }
                    }
                    catch (Exception thrown) {
                        LOG.error("Error committing element channel={} position={}", new Object[]{element.getChannel(), element.getPosition(), thrown});
                    }
                } else {
                    action = error instanceof CancellationException ? SubscriberExceptionHandler.Action.Continue : this.handleException(this.subscriber, this.method, element, error);
                }
            }
            switch (action) {
                case Continue: {
                    this.nextMessage();
                    break;
                }
                case Stop: {
                    this.subscriber.close();
                    break;
                }
                default: {
                    LOG.error("Unknown SubscriberExceptionHandler.Action {} closing subscriber", (Object)action);
                    this.subscriber.close();
                }
            }
            return VOID;
        }

        private void handleResult(Object result) {
            Class<?> type;
            boolean isAsyncReturnType;
            if (result == null || this.publishers.length == 0) {
                return;
            }
            if (result.getClass().isArray()) {
                result = Arrays.asList((Object[])result);
            }
            if (isAsyncReturnType = CompletionStage.class.isAssignableFrom(type = result.getClass())) {
                ((CompletionStage)((Object)result)).handle((msg, err1) -> {
                    if (err1 == null) {
                        this.handleResult(msg);
                    } else {
                        LOG.error("Method " + this.method + " async result completed with an error", err1);
                    }
                    return VOID;
                });
            } else {
                boolean isBlocking;
                Flux resultFlux;
                if (Publishers.isConvertibleToPublisher(result)) {
                    resultFlux = (Flux)Publishers.convertPublisher((ConversionService)this.conversionService, result, Flux.class);
                    isBlocking = this.method.hasAnnotation(Blocking.class);
                } else {
                    resultFlux = Flux.just(result);
                    isBlocking = true;
                }
                this.handleResultFlux(this.method, resultFlux, isBlocking);
            }
        }

        private void handleResultFlux(ExecutableMethod<?, ?> method, Flux<?> resultFlux, boolean isBlocking) {
            Flux recordMetadataProducer = resultFlux.subscribeOn(this.scheduler).flatMap(o -> {
                if (ArrayUtils.isNotEmpty((Object[])this.publishers)) {
                    return Flux.create(emitter -> {
                        for (Publisher<?> publisher : this.publishers) {
                            if (!publisher.isActive()) continue;
                            CompletableFuture future = publisher.publish(o);
                            future.handle((status, exception) -> {
                                if (exception != null) {
                                    emitter.error(exception);
                                } else {
                                    emitter.next(status);
                                }
                                return VOID;
                            });
                        }
                        emitter.complete();
                    }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.ERROR);
                }
                return Flux.empty();
            }).onErrorResume(throwable -> {
                LOG.error("Error processing result from method {}", (Object)method, throwable);
                return Flux.empty();
            });
            if (isBlocking) {
                recordMetadataProducer.toStream().forEach(recordMetadata -> {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Method [{}] produced record metadata: {}", (Object)method, recordMetadata);
                    }
                });
            } else {
                recordMetadataProducer.subscribe(recordMetadata -> {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Method [{}] produced record metadata: {}", (Object)method, recordMetadata);
                    }
                });
            }
        }

        private SubscriberExceptionHandler.Action handleException(Subscriber<?> subscriber, Object consumerBean, Subscriber.Element<?> element, Throwable e) {
            CoherenceSubscriberException exception = new CoherenceSubscriberException(e, consumerBean, subscriber, element);
            return this.handleException(consumerBean, exception);
        }

        private SubscriberExceptionHandler.Action handleException(Object consumerBean, CoherenceSubscriberException exception) {
            if (consumerBean instanceof SubscriberExceptionHandler) {
                return ((SubscriberExceptionHandler)consumerBean).handle(exception);
            }
            Subscriber.Element element = exception.getElement().orElse(null);
            Throwable cause = exception.getCause();
            LOG.error("Closing subscriber due to error processing element [{}] for Coherence subscriber [{}] produced error: {}", new Object[]{element, consumerBean, cause.getMessage(), cause});
            return SubscriberExceptionHandler.Action.Stop;
        }
    }
}

