/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import ratpack.exec.Downstream;
import ratpack.exec.ExecutionException;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.Upstream;
import ratpack.exec.internal.ExecutionBacking;
import ratpack.exec.internal.ThreadBinding;
import ratpack.func.Action;
import ratpack.func.Function;
import ratpack.util.Exceptions;
import ratpack.util.internal.InternalRatpackError;

public class DefaultPromise<T>
implements Promise<T> {
    private final Upstream<T> upstream;

    public DefaultPromise(Upstream<T> upstream) {
        this.upstream = upstream;
    }

    @Override
    public void then(final Action<? super T> then) {
        ThreadBinding.requireComputeThread("Promise.then() can only be called on a compute thread (use Promise.block() to use a promise on a blocking thread)");
        try {
            this.upstream.connect(new Downstream<T>(){

                @Override
                public void success(T value) {
                    try {
                        then.execute(value);
                    }
                    catch (Throwable e) {
                        DefaultPromise.this.throwError(e);
                    }
                }

                @Override
                public void error(Throwable throwable) {
                    DefaultPromise.this.throwError(throwable);
                }

                @Override
                public void complete() {
                }
            });
        }
        catch (ExecutionException e) {
            throw e;
        }
        catch (Exception e) {
            throw new InternalRatpackError("failed to add promise resume action", e);
        }
    }

    private void throwError(Throwable throwable) {
        ExecutionBacking.require().streamSubscribe(h -> h.complete(() -> {
            throw Exceptions.toException(throwable);
        }));
    }

    @Override
    public <O> Promise<O> transform(Function<? super Upstream<? extends T>, ? extends Upstream<O>> upstreamTransformer) {
        try {
            return new DefaultPromise<O>(upstreamTransformer.apply(this.upstream));
        }
        catch (Exception e) {
            throw Exceptions.uncheck(e);
        }
    }

    @Override
    public T block() throws Exception {
        ThreadBinding.requireBlockingThread("Promise.block() can only be used while blocking (i.e. use Promise.blocking() first)");
        ExecutionBacking backing = ExecutionBacking.require();
        CountDownLatch latch = new CountDownLatch(1);
        final AtomicReference resultReference = new AtomicReference();
        backing.streamSubscribe(handle -> this.upstream.connect(new Downstream<T>((ExecutionBacking.StreamHandle)handle, latch){
            final /* synthetic */ ExecutionBacking.StreamHandle val$handle;
            final /* synthetic */ CountDownLatch val$cap$1;
            {
                this.val$handle = streamHandle;
                this.val$cap$1 = countDownLatch;
            }

            @Override
            public void success(T value) {
                this.unlatch(Result.success(value));
            }

            @Override
            public void error(Throwable throwable) {
                this.unlatch(Result.error(throwable));
            }

            @Override
            public void complete() {
                this.unlatch(Result.success(null));
            }

            private void unlatch(Result<T> result) {
                resultReference.set(result);
                this.val$handle.complete();
                this.val$cap$1.countDown();
            }
        }));
        backing.eventLoopDrain();
        latch.await();
        return ((Result)resultReference.get()).getValueOrThrow();
    }
}

