/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.providers.helpers;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MultiUtils {
    public static <T> Multi<T> createFromGenerator(Supplier<T> supplier) {
        return Multi.createFrom().generator(() -> null, (s, g) -> {
            g.emit(supplier.get());
            return s;
        });
    }

    public static <T> Multi<T> publisher(Publisher<T> publisher) {
        Publisher actual = (Publisher)ParameterValidation.nonNull(publisher, (String)"publisher");
        if (actual instanceof Multi) {
            return (Multi)actual;
        }
        return Multi.createFrom().safePublisher(publisher);
    }

    public static Multi<? extends Message<?>> handlePreProcessingAcknowledgement(Multi<? extends Message<?>> multi, MediatorConfiguration configuration) {
        if (configuration.getAcknowledgment() != Acknowledgment.Strategy.PRE_PROCESSING) {
            return multi;
        }
        return multi.plug(stream -> stream.onItem().transformToUniAndConcatenate(message -> {
            CompletionStage ack = message.ack();
            return Uni.createFrom().completionStage(ack).map(x -> message);
        }));
    }

    public static <T, R> Multi<R> via(Multi<T> multi, Processor<? super T, ? super R> processor) {
        return multi.plug(stream -> Multi.createFrom().deferred(() -> {
            Multi m = MultiUtils.publisher(processor);
            stream.subscribe((Subscriber)processor);
            return m;
        }));
    }

    public static <T, R, P> Subscriber<T> via(final Processor<T, R> processor, final Function<Multi<R>, Multi<P>> function) {
        return new MultiSubscriber<T>(){

            public void onSubscribe(Subscription subscription) {
                processor.onSubscribe(subscription);
                MultiUtils.publisher(processor).plug(function).subscribe().with(r -> {});
            }

            public void onItem(T item) {
                processor.onNext(item);
            }

            public void onFailure(Throwable throwable) {
                processor.onError(throwable);
            }

            public void onCompletion() {
                processor.onComplete();
            }
        };
    }
}

