/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import java.util.Objects;
import java.util.function.Consumer;
import ratpack.exec.ExecControl;
import ratpack.exec.ExecutionException;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.Throttle;
import ratpack.exec.internal.CachingUpstream;
import ratpack.exec.internal.Downstream;
import ratpack.exec.internal.ExecutionBacking;
import ratpack.exec.internal.Upstream;
import ratpack.func.Action;
import ratpack.func.Function;
import ratpack.func.NoArgAction;
import ratpack.func.Predicate;
import ratpack.util.Exceptions;
import ratpack.util.internal.InternalRatpackError;

public class DefaultPromise<T>
implements Promise<T> {
    private final Upstream<T> upstream;

    public DefaultPromise(Upstream<T> upstream) {
        this.upstream = upstream;
    }

    @Override
    public void then(final Action<? super T> then) {
        try {
            this.upstream.connect(new Downstream<T>(){

                @Override
                public void success(T value) {
                    try {
                        then.execute(value);
                    }
                    catch (Throwable e) {
                        DefaultPromise.this.throwError(e);
                    }
                }

                @Override
                public void error(Throwable throwable) {
                    DefaultPromise.this.throwError(throwable);
                }

                @Override
                public void complete() {
                }
            });
        }
        catch (ExecutionException e) {
            throw e;
        }
        catch (Exception e) {
            throw new InternalRatpackError("failed to add promise resume action", e);
        }
    }

    private void throwError(Throwable throwable) {
        ExecutionBacking.require().streamSubscribe(h -> h.complete(() -> {
            throw Exceptions.toException(throwable);
        }));
    }

    private <O> Promise<O> connect(Upstream<O> upstream) {
        return new DefaultPromise<O>(upstream);
    }

    @Override
    public Promise<T> onError(final Action<? super Throwable> errorHandler) {
        return this.connect(downstream -> this.upstream.connect(new ValuePassThru(downstream){

            @Override
            public void error(Throwable throwable) {
                try {
                    errorHandler.execute(throwable);
                    super.complete();
                }
                catch (Throwable e) {
                    e.addSuppressed(throwable);
                    super.error(e);
                }
            }
        }));
    }

    @Override
    public <O> Promise<O> map(Function<? super T, ? extends O> transformer) {
        return this.connect(downstream -> this.upstream.connect(new Transform(downstream, transformer, downstream::success)));
    }

    @Override
    public Promise<T> mapError(final Function<? super Throwable, ? extends T> transformer) {
        return this.connect(downstream -> this.upstream.connect(new Downstream<T>(){

            @Override
            public void success(T value) {
                downstream.success(value);
            }

            @Override
            public void error(Throwable throwable) {
                try {
                    downstream.success(transformer.apply(throwable));
                }
                catch (Throwable t) {
                    downstream.error(t);
                }
            }

            @Override
            public void complete() {
                downstream.complete();
            }
        }));
    }

    @Override
    public <O> Promise<O> apply(Function<? super Promise<T>, ? extends Promise<O>> function) {
        try {
            return function.apply(this);
        }
        catch (Exception e) {
            return ExecControl.execControl().failedPromise(e);
        }
    }

    @Override
    public <O> O to(Function<? super Promise<T>, ? extends O> function) throws Exception {
        return function.apply(this);
    }

    @Override
    public <O> Promise<O> flatMap(Function<? super T, ? extends Promise<O>> transformer) {
        return this.connect(downstream -> this.upstream.connect(new Transform(downstream, transformer, promise -> promise.onError(downstream::error).then(downstream::success))));
    }

    @Override
    public Promise<T> route(final Predicate<? super T> predicate, final Action<? super T> fulfillment) {
        return this.connect(downstream -> this.upstream.connect(new Operation<T>(downstream){

            @Override
            public void success(T value) {
                boolean apply;
                try {
                    apply = predicate.apply(value);
                }
                catch (Throwable e) {
                    this.error(e);
                    return;
                }
                if (apply) {
                    try {
                        fulfillment.execute(value);
                        this.complete();
                    }
                    catch (Throwable e) {
                        this.error(e);
                    }
                } else {
                    this.downstream.success(value);
                }
            }
        }));
    }

    @Override
    public Promise<T> onNull(NoArgAction onNull) {
        return this.route(Objects::isNull, Action.ignoreArg(onNull));
    }

    @Override
    public <O> Promise<O> blockingMap(Function<? super T, ? extends O> transformer) {
        return this.flatMap(t -> ExecControl.execControl().blocking(() -> transformer.apply((Object)t)));
    }

    @Override
    public Promise<T> cache() {
        return this.connect(new CachingUpstream<T>(this.upstream));
    }

    @Override
    public Promise<T> onYield(Runnable onYield) {
        return this.connect(downstream -> {
            try {
                onYield.run();
            }
            catch (Throwable e) {
                downstream.error(e);
                return;
            }
            this.upstream.connect(downstream);
        });
    }

    @Override
    public Promise<T> defer(Action<? super Runnable> releaser) {
        return this.connect(downstream -> ExecutionBacking.require().streamSubscribe(streamHandle -> {
            try {
                releaser.execute(() -> streamHandle.complete(() -> this.upstream.connect(downstream)));
            }
            catch (Throwable t) {
                downstream.error(t);
            }
        }));
    }

    @Override
    public Promise<T> wiretap(final Action<? super Result<T>> listener) {
        return this.connect(downstream -> this.upstream.connect(new ValuePassThru(downstream){

            @Override
            public void success(T value) {
                try {
                    listener.execute(Result.success(value));
                }
                catch (Throwable t) {
                    this.error(t);
                    return;
                }
                super.success(value);
            }

            @Override
            public void error(Throwable throwable) {
                try {
                    listener.execute(Result.failure(throwable));
                }
                catch (Throwable t) {
                    t.addSuppressed(throwable);
                    super.error(t);
                    return;
                }
                super.error(throwable);
            }
        }));
    }

    @Override
    public Promise<T> throttled(Throttle throttle) {
        return throttle.throttle(this);
    }

    private abstract class ValuePassThru
    extends Operation<T> {
        public ValuePassThru(Downstream<? super T> downstream) {
            super(downstream);
        }

        @Override
        public void success(T value) {
            this.downstream.success(value);
        }
    }

    private class Transform<I, O>
    extends Operation<O> {
        private final Function<? super T, ? extends I> function;
        private final Consumer<? super I> onSuccess;

        public Transform(Downstream<? super O> downstream, Function<? super T, ? extends I> function, Consumer<? super I> onSuccess) {
            super(downstream);
            this.function = function;
            this.onSuccess = onSuccess;
        }

        @Override
        public void success(T value) {
            I transformed;
            try {
                transformed = this.function.apply(value);
            }
            catch (Throwable e) {
                this.downstream.error(e);
                return;
            }
            this.onSuccess(transformed);
        }

        public void onSuccess(I transformed) {
            this.onSuccess.accept(transformed);
        }
    }

    private abstract class Operation<O>
    implements Downstream<T> {
        protected final Downstream<? super O> downstream;

        public Operation(Downstream<? super O> downstream) {
            this.downstream = downstream;
        }

        @Override
        public void error(Throwable throwable) {
            this.downstream.error(throwable);
        }

        @Override
        public void complete() {
            this.downstream.complete();
        }
    }
}

