/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.collect.Lists;
import io.netty.channel.EventLoop;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import ratpack.exec.ExecControl;
import ratpack.exec.ExecController;
import ratpack.exec.ExecInterceptor;
import ratpack.exec.ExecStarter;
import ratpack.exec.Execution;
import ratpack.exec.Fulfiller;
import ratpack.exec.OverlappingExecutionException;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.UnmanagedThreadException;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.internal.Downstream;
import ratpack.exec.internal.ExecutionBacking;
import ratpack.exec.internal.Upstream;
import ratpack.func.Action;
import ratpack.func.NoArgAction;
import ratpack.registry.RegistrySpec;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;

public class DefaultExecControl
implements ExecControl {
    private static final Logger LOGGER = ExecutionBacking.LOGGER;
    private static final Action<Throwable> LOG_UNCAUGHT = t -> LOGGER.error("Uncaught execution exception", t);
    private static final int MAX_ERRORS_THRESHOLD = 5;
    private final ExecController execController;

    public DefaultExecControl(ExecController execController) {
        this.execController = execController;
    }

    @Override
    public Execution getExecution() throws UnmanagedThreadException {
        return ExecutionBacking.require().getExecution();
    }

    @Override
    public ExecController getController() {
        return this.execController;
    }

    @Override
    public void addInterceptor(ExecInterceptor execInterceptor, NoArgAction continuation) throws Exception {
        ExecutionBacking backing = ExecutionBacking.require();
        backing.getInterceptors().add(execInterceptor);
        backing.intercept(ExecInterceptor.ExecType.COMPUTE, Collections.singletonList(execInterceptor), continuation);
    }

    @Override
    public ExecStarter exec() {
        return new ExecStarter(){
            private Action<? super Throwable> onError = DefaultExecControl.access$000();
            private Action<? super Execution> onComplete = Action.noop();
            private Action<? super RegistrySpec> registry;
            private EventLoop eventLoop = DefaultExecControl.access$100(DefaultExecControl.this).getEventLoopGroup().next();

            @Override
            public ExecStarter eventLoop(EventLoop eventLoop) {
                this.eventLoop = eventLoop;
                return this;
            }

            @Override
            public ExecStarter onError(Action<? super Throwable> onError) {
                LinkedList seen = Lists.newLinkedList();
                this.onError = t -> {
                    if (seen.size() < 5) {
                        seen.add(t);
                        onError.execute((Throwable)t);
                    } else {
                        seen.forEach(t::addSuppressed);
                        LOGGER.error("Error handler " + onError + "reached maximum error threshold (might be caught in an error loop)", t);
                    }
                };
                return this;
            }

            @Override
            public ExecStarter onComplete(Action<? super Execution> onComplete) {
                this.onComplete = onComplete;
                return this;
            }

            @Override
            public ExecStarter register(Action<? super RegistrySpec> registry) {
                this.registry = registry;
                return this;
            }

            @Override
            public void start(Action<? super Execution> action) {
                Action<? super Execution> effectiveAction;
                Optional<StackTraceElement[]> startTrace = ExecutionBacking.TRACE ? Optional.of(Thread.currentThread().getStackTrace()) : Optional.empty();
                Action<Execution> action2 = effectiveAction = this.registry == null ? action : Action.join(this.registry, action);
                if (this.eventLoop.inEventLoop() && ExecutionBacking.get() == null) {
                    new ExecutionBacking(DefaultExecControl.this.execController, this.eventLoop, startTrace, effectiveAction, this.onError, this.onComplete);
                } else {
                    this.eventLoop.submit(() -> new ExecutionBacking(DefaultExecControl.this.execController, this.eventLoop, startTrace, effectiveAction, this.onError, this.onComplete));
                }
            }
        };
    }

    @Override
    public <T> Promise<T> promise(Action<? super Fulfiller<T>> action) {
        return this.directPromise(DefaultExecControl.upstream(action));
    }

    @Override
    public <T> Promise<T> blocking(final Callable<T> blockingOperation) {
        final ExecutionBacking backing = ExecutionBacking.get();
        return this.directPromise(downstream -> backing.streamSubscribe(streamHandle -> CompletableFuture.supplyAsync(new Supplier<Result<T>>(){
            Result result;

            @Override
            public Result<T> get() {
                try {
                    backing.intercept(ExecInterceptor.ExecType.BLOCKING, backing.getInterceptors(), () -> {
                        this.result = Result.success(blockingOperation.call());
                    });
                    return this.result;
                }
                catch (Exception e) {
                    return Result.failure(e);
                }
            }
        }, this.execController.getBlockingExecutor()).thenAcceptAsync(v -> streamHandle.complete(() -> downstream.accept(v)), (Executor)backing.getEventLoop())));
    }

    private <T> Promise<T> directPromise(Upstream<T> upstream) {
        return new DefaultPromise<T>(upstream);
    }

    @Override
    public <T> TransformablePublisher<T> stream(Publisher<T> publisher) {
        return Streams.transformable(subscriber -> ExecutionBacking.require().streamSubscribe(handle -> publisher.subscribe(new Subscriber<T>((ExecutionBacking.StreamHandle)handle, subscriber){
            final /* synthetic */ ExecutionBacking.StreamHandle val$handle;
            final /* synthetic */ Subscriber val$cap$1;
            {
                this.val$handle = streamHandle;
                this.val$cap$1 = subscriber;
            }

            public void onSubscribe(Subscription subscription) {
                this.val$handle.event(() -> this.val$cap$1.onSubscribe(subscription));
            }

            public void onNext(T element) {
                this.val$handle.event(() -> this.val$cap$1.onNext(element));
            }

            public void onComplete() {
                this.val$handle.complete(() -> ((Subscriber)this.val$cap$1).onComplete());
            }

            public void onError(Throwable cause) {
                this.val$handle.complete(() -> this.val$cap$1.onError(cause));
            }
        })));
    }

    public static <T> Upstream<T> upstream(Action<? super Fulfiller<T>> action) {
        return downstream -> ExecutionBacking.require().streamSubscribe(streamHandle -> {
            final AtomicBoolean fulfilled = new AtomicBoolean();
            try {
                action.execute(new Fulfiller<T>((ExecutionBacking.StreamHandle)streamHandle, downstream){
                    final /* synthetic */ ExecutionBacking.StreamHandle val$streamHandle;
                    final /* synthetic */ Downstream val$cap$1;
                    {
                        this.val$streamHandle = streamHandle;
                        this.val$cap$1 = downstream;
                    }

                    @Override
                    public void error(Throwable throwable) {
                        if (!fulfilled.compareAndSet(false, true)) {
                            LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled", throwable));
                            return;
                        }
                        this.val$streamHandle.complete(() -> this.val$cap$1.error(throwable));
                    }

                    @Override
                    public void success(T value) {
                        if (!fulfilled.compareAndSet(false, true)) {
                            LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled"));
                            return;
                        }
                        this.val$streamHandle.complete(() -> this.val$cap$1.success(value));
                    }
                });
            }
            catch (Throwable throwable) {
                if (!fulfilled.compareAndSet(false, true)) {
                    LOGGER.error("", (Throwable)new OverlappingExecutionException("exception thrown after promise was fulfilled", throwable));
                }
                downstream.error(throwable);
            }
        });
    }

    static /* synthetic */ Action access$000() {
        return LOG_UNCAUGHT;
    }
}

