/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging;

import io.smallrye.reactive.messaging.AbstractMediator;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.SubscriberWrapper;
import io.smallrye.reactive.messaging.WeavingException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang3.ClassUtils;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriberMediator
extends AbstractMediator {
    private PublisherBuilder<Message> source;
    private SubscriberBuilder subscriber;
    private AtomicReference<Subscription> subscription = new AtomicReference();

    public SubscriberMediator(MediatorConfiguration configuration) {
        super(configuration);
        if (configuration.shape() != Shape.SUBSCRIBER) {
            throw new IllegalArgumentException("Expected a Subscriber shape, received a " + (Object)((Object)configuration.shape()));
        }
    }

    @Override
    public void initialize(Object bean) {
        super.initialize(bean);
        switch (this.configuration.consumption()) {
            case STREAM_OF_MESSAGE: 
            case STREAM_OF_PAYLOAD: {
                this.processMethodReturningASubscriber();
                break;
            }
            case MESSAGE: 
            case PAYLOAD: {
                if (ClassUtils.isAssignable(this.configuration.getMethod().getReturnType(), CompletionStage.class)) {
                    this.processMethodReturningACompletionStage();
                    break;
                }
                this.processMethodReturningVoid();
                break;
            }
            default: {
                throw new IllegalArgumentException("Unexpected consumption type: " + (Object)((Object)this.configuration.consumption()));
            }
        }
        assert (this.subscriber != null);
    }

    @Override
    public SubscriberBuilder<Message, Void> getComputedSubscriber() {
        return this.subscriber;
    }

    @Override
    public boolean isConnected() {
        return this.source != null;
    }

    @Override
    public void connectToUpstream(PublisherBuilder<? extends Message> publisher) {
        this.source = publisher;
    }

    @Override
    public void run() {
        assert (this.source != null);
        assert (this.subscriber != null);
        Logger logger = LoggerFactory.getLogger((String)this.configuration.methodAsString());
        AtomicReference syncErrorCatcher = new AtomicReference();
        CompletionSubscriber delegate = this.subscriber.build();
        Subscriber delegating = new Subscriber((Subscriber)delegate, logger, syncErrorCatcher){
            final /* synthetic */ Subscriber val$delegate;
            final /* synthetic */ Logger val$logger;
            final /* synthetic */ AtomicReference val$syncErrorCatcher;
            {
                this.val$delegate = subscriber;
                this.val$logger = logger;
                this.val$syncErrorCatcher = atomicReference;
            }

            public void onSubscribe(Subscription s) {
                SubscriberMediator.this.subscription.set(s);
                this.val$delegate.onSubscribe(s);
            }

            public void onNext(Object o) {
                this.val$delegate.onNext(o);
            }

            public void onError(Throwable t) {
                this.val$logger.error("Error caught during the stream processing", t);
                this.val$syncErrorCatcher.set(t);
                this.val$delegate.onError(t);
            }

            public void onComplete() {
                this.val$delegate.onComplete();
            }
        };
        this.source.to(delegating).run();
        Throwable throwable = (Throwable)syncErrorCatcher.get();
        if (throwable != null) {
            throw new WeavingException(this.configuration.getIncoming(), throwable);
        }
    }

    private void processMethodReturningVoid() {
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD) {
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).map(message -> {
                this.invoke(message.getPayload());
                return message;
            }).flatMapCompletionStage(this.managePostProcessingAck()).ignore();
        }
    }

    private void processMethodReturningACompletionStage() {
        this.subscriber = this.configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD ? ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).flatMapCompletionStage(message -> {
            CompletionStage stage = (CompletionStage)this.invoke(message.getPayload());
            return stage.thenApply(x -> message);
        }).flatMapCompletionStage(this.managePostProcessingAck()).ignore() : ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).flatMapCompletionStage(message -> {
            CompletionStage completion = (CompletionStage)this.invoke(message);
            return completion.thenApply(x -> message);
        }).flatMapCompletionStage(this.managePostProcessingAck()).ignore();
    }

    private void processMethodReturningASubscriber() {
        Object result = this.invoke(new Object[0]);
        if (!(result instanceof Subscriber) && !(result instanceof SubscriberBuilder)) {
            throw new IllegalStateException("Invalid return type: " + result + " - expected a Subscriber or a SubscriberBuilder");
        }
        Object sub = result instanceof Subscriber ? (Subscriber)result : ((SubscriberBuilder)result).build();
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.STREAM_OF_PAYLOAD) {
            SubscriberWrapper<Object, Object> wrapper = new SubscriberWrapper<Object, Object>((Subscriber<Object>)sub, x -> ((Message)x).getPayload());
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).via(wrapper).ignore();
        } else {
            Subscriber casted = sub;
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).via(new SubscriberWrapper(casted, Function.identity())).ignore();
        }
    }
}

