/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.util.rx;

import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class RxUtils {
    public static Flux<CoreEvent> subscribeFluxOnPublisherSubscription(Flux<CoreEvent> triggeringSubscriber, Flux<CoreEvent> deferredSubscriber) {
        return triggeringSubscriber.compose(eventPub -> Mono.subscriberContext().flatMapMany(ctx -> eventPub.doOnSubscribe(s -> deferredSubscriber.subscriberContext(ctx).subscribe())));
    }

    public static Publisher<CoreEvent> transform(Publisher<CoreEvent> publisher, ReactiveProcessor processor) {
        return Flux.from(publisher).transform((Function)processor);
    }

    public static Publisher<CoreEvent> map(Publisher<CoreEvent> publisher, Function<CoreEvent, CoreEvent> mapper) {
        return Flux.from(publisher).map(mapper);
    }

    public static Publisher<CoreEvent> flatMap(Publisher<CoreEvent> publisher, Function<CoreEvent, Publisher<CoreEvent>> function, Component component) {
        return Flux.from(publisher).flatMap(event -> Mono.from((Publisher)((Publisher)function.apply((CoreEvent)event))).onErrorMap(e -> !(e instanceof MessagingException), e -> new MessagingException((CoreEvent)event, (Throwable)e, component)));
    }

    public static Publisher<CoreEvent> justPublishOn(CoreEvent event, ExecutorService executor) {
        return Flux.just((Object)event).publishOn(Schedulers.fromExecutorService((ExecutorService)executor));
    }
}

