/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.util.rx;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.api.util.func.CheckedConsumer;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.api.util.func.Once;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.internal.util.rx.FluxSinkSupplier;
import org.mule.runtime.core.internal.util.rx.ReactorTransactionUtils;
import org.mule.runtime.core.internal.util.rx.RoundRobinFluxSinkSupplier;
import org.mule.runtime.core.internal.util.rx.TransactionAwareFluxSinkSupplier;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;

public class RxUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(RxUtils.class);
    public static final String KEY_ON_NEXT_ERROR_STRATEGY = "reactor.onNextError.localStrategy";
    public static final String ON_NEXT_FAILURE_STRATEGY = "reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy";
    public static final String REACTOR_RECREATE_ROUTER = "recreateRouter";

    public static <T, U> Flux<T> subscribeFluxOnPublisherSubscription(Flux<T> triggeringSubscriber, Flux<U> deferredSubscriber) {
        return RxUtils.subscribeFluxOnPublisherSubscription(triggeringSubscriber, deferredSubscriber, null, null, null);
    }

    public static <T, U> Flux<T> subscribeFluxOnPublisherSubscription(Flux<T> triggeringSubscriber, Flux<U> deferredSubscriber, @Nullable Consumer<? super U> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) {
        return triggeringSubscriber.transformDeferredContextual((eventPub, ctx) -> eventPub.doOnSubscribe(s -> deferredSubscriber.contextWrite(ctx).subscribe(consumer, errorConsumer, completeConsumer)));
    }

    public static <T, U> Publisher<T> propagateCompletion(Publisher<U> upstream, Publisher<T> downstream, Function<Publisher<U>, Publisher<T>> transformer, CheckedRunnable completionCallback, CheckedConsumer<Throwable> errorCallback) {
        Objects.requireNonNull(upstream, "'upstream' must not be null");
        Objects.requireNonNull(downstream, "'downstream' must not be null");
        Objects.requireNonNull(transformer, "'transformer' must not be null");
        Objects.requireNonNull(completionCallback, "'completionCallback' must not be null");
        Objects.requireNonNull(errorCallback, "'errorCallback' must not be null");
        return RxUtils.doPropagateCompletion(upstream, downstream, transformer, new AtomicInteger(0), Once.of(completionCallback), Once.of(errorCallback), () -> null);
    }

    public static <T, U> Publisher<T> propagateCompletion(Publisher<U> upstream, Publisher<T> downstream, Function<Publisher<U>, Publisher<T>> transformer, AtomicInteger inflightCounter, CheckedRunnable completionCallback, CheckedConsumer<Throwable> errorCallback) {
        Objects.requireNonNull(upstream, "'upstream' must not be null");
        Objects.requireNonNull(downstream, "'downstream' must not be null");
        Objects.requireNonNull(transformer, "'transformer' must not be null");
        Objects.requireNonNull(completionCallback, "'completionCallback' must not be null");
        Objects.requireNonNull(errorCallback, "'errorCallback' must not be null");
        return RxUtils.doPropagateCompletion(upstream, downstream, transformer, inflightCounter, Once.of(completionCallback), Once.of(errorCallback), () -> null);
    }

    public static <T, U> Publisher<T> propagateCompletion(Publisher<U> upstream, Publisher<T> downstream, Function<Publisher<U>, Publisher<T>> transformer, CheckedRunnable completionCallback, CheckedConsumer<Throwable> errorCallback, long completionTimeoutMillis, ScheduledExecutorService delayedExecutor, String dslSource) {
        Objects.requireNonNull(upstream, "'upstream' must not be null");
        Objects.requireNonNull(downstream, "'downstream' must not be null");
        Objects.requireNonNull(transformer, "'transformer' must not be null");
        Objects.requireNonNull(completionCallback, "'completionCallback' must not be null");
        Objects.requireNonNull(errorCallback, "'errorCallback' must not be null");
        Objects.requireNonNull(delayedExecutor, "'delayedExecutor' must not be null");
        Once.RunOnce completer = Once.of(completionCallback);
        Once.ConsumeOnce<Throwable> errorForwarder = Once.of(errorCallback);
        return RxUtils.doPropagateCompletion(upstream, downstream, transformer, new AtomicInteger(0), completer, errorForwarder, () -> delayedExecutor.schedule(() -> {
            LOGGER.debug("Propagating completion after {} milliseconds\nDSL Source:\n{}", (Object)completionTimeoutMillis, (Object)dslSource);
            completer.runOnce();
        }, completionTimeoutMillis, TimeUnit.MILLISECONDS));
    }

    private static <T, U> Publisher<T> doPropagateCompletion(Publisher<U> upstream, Publisher<T> downstream, Function<Publisher<U>, Publisher<T>> transformer, AtomicInteger inflightCounter, Once.RunOnce completer, Once.ConsumeOnce<Throwable> errorForwarder, Supplier<ScheduledFuture<?>> scheduleCompletion) {
        AtomicBoolean upstreamComplete = new AtomicBoolean(false);
        AtomicReference upstreamError = new AtomicReference();
        AtomicReference scheduledCompletion = new AtomicReference();
        Flux enrichedUpstream = Flux.from(upstream).doOnNext(s -> inflightCounter.incrementAndGet()).transform(transformer);
        return RxUtils.subscribeFluxOnPublisherSubscription(Flux.from(downstream).doOnNext(s -> {
            if (inflightCounter.decrementAndGet() == 0 && upstreamComplete.get()) {
                completer.runOnce();
                ScheduledFuture scheduledFuture = (ScheduledFuture)scheduledCompletion.get();
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(true);
                }
            }
        }), enrichedUpstream, null, t -> {
            upstreamError.set(t);
            errorForwarder.consumeOnce((Throwable)t);
        }, () -> {
            upstreamComplete.set(true);
            if (inflightCounter.get() == 0) {
                completer.runOnce();
            } else {
                scheduledCompletion.set((ScheduledFuture)scheduleCompletion.get());
            }
        });
    }

    public static Publisher<CoreEvent> transform(Publisher<CoreEvent> publisher, ReactiveProcessor processor) {
        return Flux.from(publisher).transform((Function)processor);
    }

    public static Publisher<CoreEvent> map(Publisher<CoreEvent> publisher, Function<CoreEvent, CoreEvent> mapper) {
        return Flux.from(publisher).map(mapper);
    }

    public static Publisher<CoreEvent> flatMap(Publisher<CoreEvent> publisher, Function<CoreEvent, Publisher<CoreEvent>> function, Component component) {
        return Flux.from(publisher).flatMap(event -> Mono.from((Publisher)((Publisher)function.apply((CoreEvent)event))).onErrorMap(e -> !(e instanceof MessagingException), e -> new MessagingException((CoreEvent)event, (Throwable)e, component)));
    }

    public static Publisher<CoreEvent> justPublishOn(CoreEvent event, ExecutorService executor) {
        return Flux.just((Object)event).publishOn(Schedulers.fromExecutorService((ExecutorService)executor));
    }

    public static <T> Supplier<FluxSink<T>> createFluxSupplier(Function<Flux<T>, Flux<?>> configurer) {
        return () -> {
            FluxSinkRecorder sinkRef = new FluxSinkRecorder();
            Flux upstream = TransactionCoordination.isTransactionActive() ? sinkRef.flux().contextWrite(ReactorTransactionUtils.popTxFromSubscriberContext()) : sinkRef.flux();
            Flux flux = (Flux)configurer.apply(upstream);
            if (TransactionCoordination.isTransactionActive()) {
                flux = flux.contextWrite(ReactorTransactionUtils.pushTxToSubscriberContext(configurer.toString()));
            }
            flux.subscribe(null, e -> LOGGER.error("Exception reached subscriber for " + configurer.toString(), e));
            return sinkRef.getFluxSink();
        };
    }

    public static <T> FluxSinkSupplier<T> createRoundRobinFluxSupplier(Function<Flux<T>, Flux<?>> configurer, int size) {
        Supplier<FluxSink<T>> factory = RxUtils.createFluxSupplier(configurer);
        return new TransactionAwareFluxSinkSupplier<T>(factory, new RoundRobinFluxSinkSupplier<T>(size, factory));
    }

    public static <E extends Throwable> Function<? super Either<E, CoreEvent>, ? extends CoreEvent> propagateErrorResponseMapper() {
        return result -> result.reduce(me -> {
            throw Exceptions.propagateWrappingFatal(me);
        }, UnaryOperator.identity());
    }
}

