/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import ratpack.exec.Downstream;
import ratpack.exec.ExecutionException;
import ratpack.exec.Promise;
import ratpack.exec.Upstream;
import ratpack.exec.internal.ExecutionBacking;
import ratpack.exec.internal.ThreadBinding;
import ratpack.func.Action;
import ratpack.func.Function;
import ratpack.util.Exceptions;

public class DefaultPromise<T>
implements Promise<T> {
    private final Upstream<T> upstream;

    public DefaultPromise(Upstream<T> upstream) {
        this.upstream = upstream;
    }

    @Override
    public void then(final Action<? super T> then) {
        ThreadBinding.requireComputeThread("Promise.then() can only be called on a compute thread (use Blocking.on() to use a promise on a blocking thread)");
        this.doConnect(new Downstream<T>(){

            @Override
            public void success(T value) {
                try {
                    then.execute(value);
                }
                catch (Exception e) {
                    DefaultPromise.this.throwError(e);
                }
            }

            @Override
            public void error(Throwable throwable) {
                DefaultPromise.this.throwError(throwable);
            }

            @Override
            public void complete() {
            }
        });
    }

    @Override
    public void connect(Downstream<T> downstream) {
        ThreadBinding.requireComputeThread("Promise.connect() can only be called on a compute thread (use Blocking.on() to use a promise on a blocking thread)");
        this.doConnect(downstream);
    }

    public void doConnect(Downstream<T> downstream) {
        try {
            this.upstream.connect(downstream);
        }
        catch (ExecutionException e) {
            throw e;
        }
        catch (Exception e) {
            this.throwError(e);
        }
    }

    private void throwError(Throwable throwable) {
        ExecutionBacking.require().streamSubscribe(h -> h.complete(() -> {
            throw Exceptions.toException(throwable);
        }));
    }

    @Override
    public <O> Promise<O> transform(Function<? super Upstream<? extends T>, ? extends Upstream<O>> upstreamTransformer) {
        try {
            return new DefaultPromise<O>(upstreamTransformer.apply(this.upstream));
        }
        catch (Exception e) {
            throw Exceptions.uncheck(e);
        }
    }
}

