/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.coherence.messaging;

import com.tangosol.net.Session;
import com.tangosol.net.topic.Publisher;
import io.micronaut.aop.InterceptedMethod;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.coherence.annotation.CoherencePublisher;
import io.micronaut.coherence.annotation.SessionName;
import io.micronaut.coherence.annotation.Topic;
import io.micronaut.coherence.annotation.Topics;
import io.micronaut.coherence.annotation.Utils;
import io.micronaut.coherence.messaging.TopicKey;
import io.micronaut.context.BeanContext;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.ReturnType;
import io.micronaut.core.util.StringUtils;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.exceptions.MessagingClientException;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class CoherencePublisherIntroductionAdvice
implements MethodInterceptor<Object, Object>,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(CoherencePublisherIntroductionAdvice.class);
    private final BeanContext beanContext;
    private final ConversionService<?> conversionService;
    private final Map<TopicKey, Publisher<Object>> publisherMap = new ConcurrentHashMap<TopicKey, Publisher<Object>>();

    CoherencePublisherIntroductionAdvice(BeanContext beanContext, ConversionService<?> conversionService) {
        this.beanContext = beanContext;
        this.conversionService = conversionService;
    }

    public Object intercept(final MethodInvocationContext<Object, Object> context) {
        if (context.hasAnnotation(CoherencePublisher.class)) {
            Argument argument;
            int i;
            context.findAnnotation(CoherencePublisher.class).orElseThrow(() -> new IllegalStateException("No @CoherencePublisher annotation present on method: " + context));
            String topicName = Utils.getFirstTopicName(context).orElse(null);
            String sessionName = context.stringValue(SessionName.class).orElse("");
            Duration maxBlock = context.getValue(CoherencePublisher.class, "maxBlock", Duration.class).orElse(null);
            Argument bodyArgument = null;
            Argument[] arguments = context.getArguments();
            Object[] parameterValues = context.getParameterValues();
            int valueIndex = -1;
            for (i = 0; i < arguments.length; ++i) {
                Object o;
                argument = arguments[i];
                if (argument.isAnnotationPresent(MessageBody.class)) {
                    bodyArgument = argument;
                    valueIndex = i;
                    continue;
                }
                if (!argument.isAnnotationPresent(Topics.class) && !argument.isAnnotationPresent(Topic.class) || (o = parameterValues[i]) == null) continue;
                topicName = o.toString();
            }
            if (StringUtils.isEmpty((CharSequence)topicName)) {
                throw new MessagingClientException("No topic specified for method: " + context);
            }
            if (bodyArgument == null) {
                for (i = 0; i < arguments.length; ++i) {
                    argument = arguments[i];
                    if (argument.getAnnotationMetadata().hasStereotype(Bindable.class)) continue;
                    bodyArgument = argument;
                    valueIndex = i;
                    break;
                }
            }
            if (bodyArgument == null) {
                throw new MessagingClientException("No valid message body argument found for method: " + context);
            }
            Object value = parameterValues[valueIndex];
            ReturnType returnType = context.getReturnType();
            Class javaReturnType = returnType.getType();
            Publisher<Object> publisher = this.getPublisher(topicName, sessionName);
            boolean isReactiveReturnType = Publishers.isConvertibleToPublisher((Class)javaReturnType);
            boolean isReactiveValue = value != null && Publishers.isConvertibleToPublisher(value.getClass());
            InterceptedMethod interceptedMethod = InterceptedMethod.of(context);
            if (isReactiveReturnType) {
                Flowable<Object> flowable = this.buildSendFlowable(context, publisher, Argument.OBJECT_ARGUMENT, maxBlock, value);
                return Publishers.convertPublisher(flowable, (Class)javaReturnType);
            }
            Argument returnArg = returnType.getFirstTypeVariable().orElse(Argument.of(Void.class));
            if (returnArg.getType() != Void.class) {
                throw new MessagingClientException("Generic return type for method must be Void, i.e. CompletableFuture<Void> - " + context);
            }
            final CompletableFuture completableFuture = new CompletableFuture();
            if (isReactiveValue) {
                Flowable sendFlowable = this.buildSendFlowable(context, publisher, returnArg, maxBlock, value);
                if (!Publishers.isSingle(value.getClass())) {
                    sendFlowable = sendFlowable.toList().toFlowable();
                }
                sendFlowable.subscribe((Subscriber)new Subscriber<Object>(){

                    public void onSubscribe(Subscription s) {
                        s.request(1L);
                    }

                    public void onNext(Object o) {
                    }

                    public void onError(Throwable t) {
                        completableFuture.completeExceptionally((Throwable)CoherencePublisherIntroductionAdvice.this.wrapException((MethodInvocationContext<Object, Object>)context, t));
                    }

                    public void onComplete() {
                        completableFuture.complete(null);
                    }
                });
            } else {
                publisher.publish(value).handle((status, exception) -> {
                    if (exception != null) {
                        completableFuture.completeExceptionally((Throwable)this.wrapException(context, (Throwable)exception));
                    } else {
                        completableFuture.complete(null);
                    }
                    return null;
                });
            }
            return interceptedMethod.handleResult(completableFuture);
        }
        return context.proceed();
    }

    @Override
    public void close() {
        for (Map.Entry<TopicKey, Publisher<Object>> entry : this.publisherMap.entrySet()) {
            Publisher<Object> publisher = entry.getValue();
            try {
                publisher.flush().get(1L, TimeUnit.MINUTES);
            }
            catch (Throwable t) {
                LOG.error("Error flushing publisher", t);
            }
            try {
                publisher.close();
            }
            catch (Throwable t) {
                LOG.error("Error closing publisher", t);
            }
        }
    }

    @NonNull
    private Publisher<Object> getPublisher(String topicName, String sessionName) {
        TopicKey key = new TopicKey(topicName, sessionName);
        return this.publisherMap.compute(key, (k, publisher) -> {
            if (publisher != null) {
                return publisher;
            }
            Session session = (Session)this.beanContext.createBean(Session.class, new Object[]{sessionName});
            return session.getTopic(topicName).createPublisher();
        });
    }

    private Flowable<Object> buildSendFlowable(MethodInvocationContext<Object, Object> context, Publisher<Object> publisher, Argument<?> returnType, Duration maxBlock, Object value) {
        Flowable valueFlowable = (Flowable)Publishers.convertPublisher((Object)value, Flowable.class);
        Class javaReturnType = returnType.getType();
        if (Iterable.class.isAssignableFrom(javaReturnType)) {
            javaReturnType = returnType.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT).getType();
        }
        Class finalJavaReturnType = javaReturnType;
        Flowable sendFlowable = valueFlowable.flatMap(o -> Flowable.create(emitter -> publisher.publish(o).handle((metadata, exception) -> {
            if (exception != null) {
                emitter.onError((Throwable)this.wrapException(context, (Throwable)exception));
            } else {
                if (finalJavaReturnType.isInstance(o)) {
                    emitter.onNext(o);
                } else {
                    this.conversionService.convert(metadata, finalJavaReturnType).ifPresent(arg_0 -> ((FlowableEmitter)emitter).onNext(arg_0));
                }
                emitter.onComplete();
            }
            return null;
        }), (BackpressureStrategy)BackpressureStrategy.BUFFER));
        if (maxBlock != null) {
            sendFlowable = sendFlowable.timeout(maxBlock.toMillis(), TimeUnit.MILLISECONDS);
        }
        return sendFlowable;
    }

    private MessagingClientException wrapException(MethodInvocationContext<Object, Object> context, Throwable exception) {
        return new MessagingClientException("Exception sending message for method [" + context + "]: " + exception.getMessage(), exception);
    }
}

