/*
 * Decompiled with CFR 0.152.
 */
package ratpack.rx2;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import ratpack.exec.Downstream;
import ratpack.exec.ExecController;
import ratpack.exec.Execution;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.UnmanagedThreadException;
import ratpack.exec.registry.RegistrySpec;
import ratpack.exec.stream.Streams;
import ratpack.exec.stream.TransformablePublisher;
import ratpack.func.Action;
import ratpack.func.Exceptions;
import ratpack.rx2.internal.DefaultSchedulers;
import ratpack.rx2.internal.ErrorHandler;
import ratpack.rx2.internal.ExecControllerBackedScheduler;
import ratpack.rx2.internal.ExecutionBackedObserver;
import ratpack.rx2.internal.ExecutionBackedSubscriber;

public abstract class RxRatpack {
    private RxRatpack() {
    }

    public static void initialize() {
        RxJavaPlugins.setErrorHandler((Consumer)new ErrorHandler());
        RxJavaPlugins.setInitComputationSchedulerHandler(c -> DefaultSchedulers.getComputationScheduler());
        RxJavaPlugins.setInitIoSchedulerHandler(c -> DefaultSchedulers.getIoScheduler());
        RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> new ExecutionBackedObserver(observer));
        RxJavaPlugins.setOnFlowableSubscribe((flowable, subscriber) -> new ExecutionBackedSubscriber(subscriber));
    }

    public static <T> Single<T> single(Promise<T> promise) {
        return Single.create(subscriber -> promise.onError(arg_0 -> ((SingleEmitter)subscriber).onError(arg_0)).then(arg_0 -> ((SingleEmitter)subscriber).onSuccess(arg_0)));
    }

    public static Completable complete(Operation operation) {
        return Completable.create(subscriber -> operation.onError(arg_0 -> ((CompletableEmitter)subscriber).onError(arg_0)).then(() -> ((CompletableEmitter)subscriber).onComplete()));
    }

    public static <T, I extends Iterable<T>> Observable<T> observe(Promise<I> promise) {
        return Observable.merge((ObservableSource)RxRatpack.single(promise).toObservable().map(Observable::fromIterable));
    }

    public static <T> Promise<List<T>> promiseAll(Observable<T> observable) throws UnmanagedThreadException {
        return Promise.async(f -> observable.toList().subscribe(arg_0 -> ((Downstream)f).success(arg_0), arg_0 -> ((Downstream)f).error(arg_0)));
    }

    public static <T> Promise<List<T>> promiseAll(ObservableOnSubscribe<T> onSubscribe) throws UnmanagedThreadException {
        return RxRatpack.promiseAll(Observable.create(onSubscribe));
    }

    public static <T> Promise<T> promise(Single<T> single) throws UnmanagedThreadException {
        return Promise.async(f -> single.subscribe(arg_0 -> ((Downstream)f).success(arg_0), arg_0 -> ((Downstream)f).error(arg_0)));
    }

    public static <T> Promise<T> promise(SingleOnSubscribe<T> onSubscribe) throws UnmanagedThreadException {
        return RxRatpack.promise(Single.create(onSubscribe));
    }

    public static <T> TransformablePublisher<T> publisher(Observable<T> observable, BackpressureStrategy strategy) {
        return Streams.transformable((Publisher)observable.toFlowable(strategy));
    }

    public static <T> TransformablePublisher<T> publisher(ObservableOnSubscribe<T> onSubscribe, BackpressureStrategy strategy) {
        return RxRatpack.publisher(Observable.create(onSubscribe), strategy);
    }

    public static <T> Observable<T> bindExec(Observable<T> source) {
        return (Observable)Exceptions.uncheck(() -> (Observable)RxRatpack.promiseAll(source).to(RxRatpack::observe));
    }

    public static <T> Observable<T> fork(Observable<T> observable) {
        return RxRatpack.observe(RxRatpack.promiseAll(observable).fork());
    }

    public static <T> Observable<T> fork(Observable<T> observable, Action<? super RegistrySpec> registrySpec) throws Exception {
        return RxRatpack.observe(RxRatpack.promiseAll(observable).fork(execSpec -> execSpec.register(registrySpec)));
    }

    public static <T> Observable<T> forkEach(Observable<T> observable) {
        return RxRatpack.forkEach(observable, (Action<? super RegistrySpec>)Action.noop());
    }

    public static <T> Observable<T> forkEach(Observable<T> observable, final Action<? super RegistrySpec> registrySpec) {
        return observable.lift(downstream -> new Observer<T>(){
            private final AtomicInteger wip = new AtomicInteger(1);
            private final AtomicBoolean closed = new AtomicBoolean();
            private Disposable disposable;

            public void onSubscribe(Disposable d) {
                this.disposable = d;
                downstream.onSubscribe(d);
            }

            public void onComplete() {
                this.maybeDone();
            }

            public void onError(Throwable e) {
                this.terminate(() -> downstream.onError(e));
            }

            private void maybeDone() {
                if (this.wip.decrementAndGet() == 0) {
                    this.terminate(() -> ((Observer)downstream).onComplete());
                }
            }

            private void terminate(Runnable runnable) {
                if (this.closed.compareAndSet(false, true)) {
                    runnable.run();
                }
            }

            public void onNext(T t) {
                if (this.disposable.isDisposed() || this.closed.get()) {
                    return;
                }
                this.wip.incrementAndGet();
                Execution.fork().register(registrySpec).onComplete(e -> this.maybeDone()).onError(this::onError).start(e -> {
                    if (!this.closed.get()) {
                        downstream.onNext(t);
                    }
                });
            }
        });
    }

    public static Scheduler scheduler(ExecController execController) {
        return new ExecControllerBackedScheduler(execController);
    }

    public static Scheduler scheduler() {
        return RxRatpack.scheduler(ExecController.require());
    }
}

