/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import java.time.Duration;
import ratpack.exec.Downstream;
import ratpack.exec.Execution;
import ratpack.exec.ExecutionException;
import ratpack.exec.Promise;
import ratpack.exec.Upstream;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.ExecThreadBinding;
import ratpack.exec.util.retry.RetryPolicy;
import ratpack.func.Action;
import ratpack.func.BiAction;
import ratpack.func.BiFunction;
import ratpack.func.Block;
import ratpack.func.Function;
import ratpack.util.Exceptions;

public class DefaultPromise<T>
implements Promise<T> {
    public static final Promise<Void> NULL = Promise.value(null);
    private final Upstream<T> upstream;

    public DefaultPromise(Upstream<T> upstream) {
        this.upstream = upstream;
    }

    @Override
    public void then(final Action<? super T> then) {
        ExecThreadBinding.requireComputeThread("Promise.then() can only be called on a compute thread (use Blocking.on() to use a promise on a blocking thread)");
        this.doConnect(new Downstream<T>(){

            @Override
            public void success(T value) {
                try {
                    then.execute(value);
                }
                catch (Throwable e) {
                    DefaultPromise.throwError(e);
                }
            }

            @Override
            public void error(Throwable throwable) {
                DefaultPromise.throwError(throwable);
            }

            @Override
            public void complete() {
            }
        });
    }

    @Override
    public void connect(Downstream<? super T> downstream) {
        ExecThreadBinding.requireComputeThread("Promise.connect() can only be called on a compute thread (use Blocking.on() to use a promise on a blocking thread)");
        this.doConnect(downstream);
    }

    private void doConnect(Downstream<? super T> downstream) {
        try {
            this.upstream.connect(downstream);
        }
        catch (ExecutionException e) {
            throw e;
        }
        catch (Exception e) {
            DefaultPromise.throwError(e);
        }
    }

    public static void throwError(Throwable throwable) {
        DefaultExecution.require().delimit(Action.throwException(), h -> h.resume(Block.throwException(throwable)));
    }

    @Override
    public <O> Promise<O> transform(Function<? super Upstream<? extends T>, ? extends Upstream<O>> upstreamTransformer) {
        try {
            return new DefaultPromise<O>(upstreamTransformer.apply(this.upstream));
        }
        catch (Exception e) {
            throw Exceptions.uncheck(e);
        }
    }

    public static <T> void retryAttempt(final int attemptNum, final int maxAttempts, final Upstream<? extends T> up, final Downstream<? super T> down, final BiFunction<? super Integer, ? super Throwable, Promise<Duration>> onError) throws Exception {
        up.connect(down.onError(e -> {
            if (attemptNum > maxAttempts) {
                down.error((Throwable)e);
            } else {
                Promise delay;
                try {
                    delay = (Promise)onError.apply((Integer)attemptNum, (Throwable)e);
                }
                catch (Throwable errorHandlerError) {
                    if (errorHandlerError != e) {
                        errorHandlerError.addSuppressed((Throwable)e);
                    }
                    down.error(errorHandlerError);
                    return;
                }
                delay.connect(new Downstream<Duration>(){

                    @Override
                    public void success(Duration value) {
                        Execution.sleep(value, () -> DefaultPromise.retryAttempt(attemptNum + 1, maxAttempts, up, down, onError));
                    }

                    @Override
                    public void error(Throwable throwable) {
                        down.error(throwable);
                    }

                    @Override
                    public void complete() {
                        down.complete();
                    }
                });
            }
        }));
    }

    public static <T> void retry(final RetryPolicy retryPolicy, final Upstream<? extends T> up, final Downstream<? super T> down, final BiAction<? super Integer, ? super Throwable> onError) throws Exception {
        up.connect(down.onError(e -> {
            if (retryPolicy.isExhausted()) {
                down.error((Throwable)e);
            } else {
                Promise<Duration> delay;
                try {
                    onError.execute((Integer)retryPolicy.attempts(), (Throwable)e);
                    delay = retryPolicy.delay();
                }
                catch (Throwable errorHandlerError) {
                    if (errorHandlerError != e) {
                        errorHandlerError.addSuppressed((Throwable)e);
                    }
                    down.error(errorHandlerError);
                    return;
                }
                delay.connect(new Downstream<Duration>(){

                    @Override
                    public void success(Duration value) {
                        Execution.sleep(value, () -> DefaultPromise.retry(retryPolicy.increaseAttempt(), up, down, onError));
                    }

                    @Override
                    public void error(Throwable throwable) {
                        down.error(throwable);
                    }

                    @Override
                    public void complete() {
                        down.complete();
                    }
                });
            }
        }));
    }
}

