/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging;

import io.reactivex.Flowable;
import io.smallrye.reactive.messaging.Invoker;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.ProcessingException;
import io.smallrye.reactive.messaging.PublisherDecorator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import javax.enterprise.inject.Instance;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;

public abstract class AbstractMediator {
    protected final MediatorConfiguration configuration;
    private Invoker invoker;
    private Instance<PublisherDecorator> decorators;

    public AbstractMediator(MediatorConfiguration configuration) {
        this.configuration = configuration;
    }

    public synchronized void setInvoker(Invoker invoker) {
        this.invoker = invoker;
    }

    public void setDecorators(Instance<PublisherDecorator> decorators) {
        this.decorators = decorators;
    }

    public void run() {
    }

    public void connectToUpstream(PublisherBuilder<? extends Message> publisher) {
    }

    public MediatorConfiguration configuration() {
        return this.configuration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initialize(Object bean) {
        AbstractMediator abstractMediator = this;
        synchronized (abstractMediator) {
            if (this.invoker == null) {
                this.invoker = args -> {
                    try {
                        return this.configuration.getMethod().invoke(bean, args);
                    }
                    catch (Exception e) {
                        throw new ProcessingException(this.configuration.methodAsString(), e);
                    }
                };
            }
        }
    }

    protected <T> T invoke(Object ... args) {
        try {
            Objects.requireNonNull(this.invoker, "Invoker not initialized");
            return (T)this.invoker.invoke(args);
        }
        catch (RuntimeException e) {
            LoggerFactory.getLogger((String)this.configuration().methodAsString()).error("The method " + this.configuration().methodAsString() + " has thrown an exception", (Throwable)e);
            throw e;
        }
    }

    protected CompletionStage<Message> getAckOrCompletion(Message<?> message) {
        CompletionStage ack = message.ack();
        if (ack != null) {
            return ack.thenApply(x -> message);
        }
        return CompletableFuture.completedFuture(message);
    }

    public PublisherBuilder<? extends Message> getStream() {
        return null;
    }

    public MediatorConfiguration getConfiguration() {
        return this.configuration;
    }

    public String getMethodAsString() {
        return this.configuration.methodAsString();
    }

    public SubscriberBuilder<Message, Void> getComputedSubscriber() {
        return null;
    }

    public abstract boolean isConnected();

    protected Function<Message, ? extends CompletionStage<? extends Message>> managePreProcessingAck() {
        return message -> {
            if (this.configuration.getAcknowledgment() == Acknowledgment.Strategy.PRE_PROCESSING) {
                return this.getAckOrCompletion((Message<?>)message);
            }
            return CompletableFuture.completedFuture(message);
        };
    }

    public PublisherBuilder<? extends Message> decorate(PublisherBuilder<? extends Message> input) {
        if (input == null) {
            return null;
        }
        for (PublisherDecorator decorator : this.decorators) {
            input = decorator.decorate(input, this.getConfiguration().getOutgoing());
        }
        if (this.configuration.getBroadcast()) {
            Flowable flow = Flowable.fromPublisher((Publisher)input.buildRs());
            if (this.configuration.getNumberOfSubscriberBeforeConnecting() != 0) {
                return ReactiveStreams.fromPublisher((Publisher)Flowable.fromPublisher((Publisher)flow).publish().autoConnect(this.configuration.getNumberOfSubscriberBeforeConnecting()));
            }
            return ReactiveStreams.fromPublisher((Publisher)Flowable.fromPublisher((Publisher)flow).publish().autoConnect());
        }
        return input;
    }
}

