/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.netty.channel.EventLoop;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import ratpack.exec.Downstream;
import ratpack.exec.ExecBuilder;
import ratpack.exec.ExecControl;
import ratpack.exec.ExecController;
import ratpack.exec.ExecInterceptor;
import ratpack.exec.Execution;
import ratpack.exec.Fulfiller;
import ratpack.exec.OverlappingExecutionException;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.UnmanagedThreadException;
import ratpack.exec.Upstream;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.internal.ExecControlInternal;
import ratpack.exec.internal.ExecutionBacking;
import ratpack.func.Action;
import ratpack.func.BiAction;
import ratpack.func.Block;
import ratpack.registry.RegistrySpec;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.util.Exceptions;

public class DefaultExecControl
implements ExecControl,
ExecControlInternal {
    private static final Logger LOGGER = ExecutionBacking.LOGGER;
    private static final BiAction<Execution, Throwable> LOG_UNCAUGHT = (o, t) -> LOGGER.error("Uncaught execution exception", t);
    private static final int MAX_ERRORS_THRESHOLD = 5;
    private final ExecController execController;
    private ImmutableList<? extends ExecInterceptor> interceptors = ImmutableList.of();

    public DefaultExecControl(ExecController execController) {
        this.execController = execController;
    }

    @Override
    public void setDefaultInterceptors(ImmutableList<? extends ExecInterceptor> interceptors) {
        this.interceptors = interceptors;
    }

    @Override
    public Execution getExecution() throws UnmanagedThreadException {
        return ExecutionBacking.require().getExecution();
    }

    @Override
    public ExecController getController() {
        return this.execController;
    }

    @Override
    public void addInterceptor(ExecInterceptor execInterceptor, Block continuation) throws Exception {
        ExecutionBacking backing = ExecutionBacking.require();
        backing.addInterceptor(execInterceptor);
        backing.intercept(ExecInterceptor.ExecType.COMPUTE, Collections.singletonList(execInterceptor).iterator(), continuation);
    }

    @Override
    public ExecBuilder fork() {
        return new ExecBuilder(){
            private BiAction<? super Execution, ? super Throwable> onError = DefaultExecControl.access$000();
            private Action<? super Execution> onComplete = Action.noop();
            private Action<? super RegistrySpec> registry = Action.noop();
            private EventLoop eventLoop = DefaultExecControl.access$100(DefaultExecControl.this).getEventLoopGroup().next();

            @Override
            public ExecBuilder eventLoop(EventLoop eventLoop) {
                this.eventLoop = eventLoop;
                return this;
            }

            @Override
            public ExecBuilder onError(BiAction<? super Execution, ? super Throwable> onError) {
                LinkedList seen = Lists.newLinkedList();
                this.onError = (e, t) -> {
                    if (seen.size() < 5) {
                        seen.add(t);
                        onError.execute((Execution)e, (Throwable)t);
                    } else {
                        seen.forEach(t::addSuppressed);
                        LOGGER.error("Error handler " + onError + "reached maximum error threshold (might be caught in an error loop)", t);
                    }
                };
                return this;
            }

            @Override
            public ExecBuilder onError(Action<? super Throwable> onError) {
                return this.onError((? super Execution e, ? super Throwable t) -> onError.execute((Throwable)t));
            }

            @Override
            public ExecBuilder onComplete(Action<? super Execution> onComplete) {
                this.onComplete = onComplete;
                return this;
            }

            @Override
            public ExecBuilder register(Action<? super RegistrySpec> action) {
                this.registry = action;
                return this;
            }

            @Override
            public void start(Action<? super Execution> action) {
                if (this.eventLoop.inEventLoop() && ExecutionBacking.get() == null) {
                    Exceptions.uncheck(() -> new ExecutionBacking(DefaultExecControl.this.execController, this.eventLoop, (ImmutableList<? extends ExecInterceptor>)DefaultExecControl.this.interceptors, this.registry, action, this.onError, this.onComplete));
                } else {
                    this.eventLoop.submit(() -> new ExecutionBacking(DefaultExecControl.this.execController, this.eventLoop, (ImmutableList<? extends ExecInterceptor>)DefaultExecControl.this.interceptors, this.registry, action, this.onError, this.onComplete));
                }
            }
        };
    }

    @Override
    public <T> Promise<T> promise(Action<? super Fulfiller<T>> action) {
        return this.directPromise(DefaultExecControl.upstream(action));
    }

    @Override
    public <T> Promise<T> blocking(final Callable<T> blockingOperation) {
        return this.directPromise(downstream -> {
            final ExecutionBacking backing = ExecutionBacking.require();
            backing.streamSubscribe(streamHandle -> CompletableFuture.supplyAsync(new Supplier<Result<T>>(){
                Result result;

                @Override
                public Result<T> get() {
                    try {
                        ExecutionBacking.THREAD_BINDING.set(backing);
                        backing.intercept(ExecInterceptor.ExecType.BLOCKING, backing.getAllInterceptors().iterator(), () -> {
                            Object value = blockingOperation.call();
                            this.result = Result.success(value);
                        });
                        Result result = this.result;
                        return result;
                    }
                    catch (Exception e) {
                        Result result = Result.error(e);
                        return result;
                    }
                    finally {
                        ExecutionBacking.THREAD_BINDING.remove();
                    }
                }
            }, this.execController.getBlockingExecutor()).thenAcceptAsync(v -> streamHandle.complete(() -> downstream.accept(v)), (Executor)backing.getEventLoop()));
        });
    }

    private <T> Promise<T> directPromise(Upstream<T> upstream) {
        return new DefaultPromise<T>(upstream);
    }

    @Override
    public <T> TransformablePublisher<T> stream(Publisher<T> publisher) {
        return Streams.transformable(subscriber -> ExecutionBacking.require().streamSubscribe(handle -> publisher.subscribe(new Subscriber<T>((ExecutionBacking.StreamHandle)handle, subscriber){
            final /* synthetic */ ExecutionBacking.StreamHandle val$handle;
            final /* synthetic */ Subscriber val$cap$1;
            {
                this.val$handle = streamHandle;
                this.val$cap$1 = subscriber;
            }

            public void onSubscribe(Subscription subscription) {
                this.val$handle.event(() -> this.val$cap$1.onSubscribe(subscription));
            }

            public void onNext(T element) {
                this.val$handle.event(() -> this.val$cap$1.onNext(element));
            }

            public void onComplete() {
                this.val$handle.complete(() -> ((Subscriber)this.val$cap$1).onComplete());
            }

            public void onError(Throwable cause) {
                this.val$handle.complete(() -> this.val$cap$1.onError(cause));
            }
        })));
    }

    public static <T> Upstream<T> upstream(Action<? super Fulfiller<T>> action) {
        return downstream -> ExecutionBacking.require().streamSubscribe(streamHandle -> {
            final AtomicBoolean fulfilled = new AtomicBoolean();
            try {
                action.execute(new Fulfiller<T>((ExecutionBacking.StreamHandle)streamHandle, downstream){
                    final /* synthetic */ ExecutionBacking.StreamHandle val$streamHandle;
                    final /* synthetic */ Downstream val$cap$1;
                    {
                        this.val$streamHandle = streamHandle;
                        this.val$cap$1 = downstream;
                    }

                    @Override
                    public void error(Throwable throwable) {
                        if (!fulfilled.compareAndSet(false, true)) {
                            LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled", throwable));
                            return;
                        }
                        this.val$streamHandle.complete(() -> this.val$cap$1.error(throwable));
                    }

                    @Override
                    public void success(T value) {
                        if (!fulfilled.compareAndSet(false, true)) {
                            LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled"));
                            return;
                        }
                        this.val$streamHandle.complete(() -> this.val$cap$1.success(value));
                    }
                });
            }
            catch (Throwable throwable) {
                if (!fulfilled.compareAndSet(false, true)) {
                    LOGGER.error("", (Throwable)new OverlappingExecutionException("exception thrown after promise was fulfilled", throwable));
                }
                streamHandle.complete(() -> downstream.error(throwable));
            }
        });
    }

    static /* synthetic */ BiAction access$000() {
        return LOG_UNCAUGHT;
    }
}

