/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.collect.Lists;
import io.netty.channel.EventLoop;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import ratpack.exec.ExecControl;
import ratpack.exec.ExecController;
import ratpack.exec.ExecInterceptor;
import ratpack.exec.ExecStarter;
import ratpack.exec.Execution;
import ratpack.exec.ExecutionException;
import ratpack.exec.Fulfiller;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.internal.ExecutionBacking;
import ratpack.exec.internal.SafeFulfiller;
import ratpack.func.Action;
import ratpack.func.NoArgAction;
import ratpack.registry.RegistrySpec;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;

public class DefaultExecControl
implements ExecControl {
    private static final Logger LOGGER = ExecutionBacking.LOGGER;
    private static final Action<Throwable> LOG_UNCAUGHT = t -> LOGGER.error("Uncaught execution exception", t);
    private static final int MAX_ERRORS_THRESHOLD = 5;
    private final ExecController execController;
    private final ThreadLocal<ExecutionBacking> threadBinding = new ThreadLocal();

    public DefaultExecControl(ExecController execController) {
        this.execController = execController;
    }

    private ExecutionBacking getBacking() {
        ExecutionBacking executionBacking = this.threadBinding.get();
        if (executionBacking == null) {
            throw new ExecutionException("Current thread (" + Thread.currentThread().getName() + ") has no bound execution.");
        }
        return executionBacking;
    }

    @Override
    public Execution getExecution() {
        return this.getBacking().getExecution();
    }

    @Override
    public ExecController getController() {
        return this.execController;
    }

    @Override
    public void addInterceptor(ExecInterceptor execInterceptor, NoArgAction continuation) throws Exception {
        ExecutionBacking backing = this.getBacking();
        backing.getInterceptors().add(execInterceptor);
        backing.intercept(ExecInterceptor.ExecType.COMPUTE, Collections.singletonList(execInterceptor), continuation);
    }

    @Override
    public ExecStarter exec() {
        return new ExecStarter(){
            private Action<? super Throwable> onError = DefaultExecControl.access$000();
            private Action<? super Execution> onComplete = Action.noop();
            private Action<? super RegistrySpec> registry;
            private EventLoop eventLoop = DefaultExecControl.access$100(DefaultExecControl.this).getEventLoopGroup().next();

            @Override
            public ExecStarter eventLoop(EventLoop eventLoop) {
                this.eventLoop = eventLoop;
                return this;
            }

            @Override
            public ExecStarter onError(Action<? super Throwable> onError) {
                LinkedList seen = Lists.newLinkedList();
                this.onError = t -> {
                    if (seen.size() < 5) {
                        seen.add(t);
                        onError.execute((Throwable)t);
                    } else {
                        seen.forEach(t::addSuppressed);
                        LOGGER.error("Error handler " + onError + "reached maximum error threshold (might be caught in an error loop)", t);
                    }
                };
                return this;
            }

            @Override
            public ExecStarter onComplete(Action<? super Execution> onComplete) {
                this.onComplete = onComplete;
                return this;
            }

            @Override
            public ExecStarter register(Action<? super RegistrySpec> registry) {
                this.registry = registry;
                return this;
            }

            @Override
            public void start(Action<? super Execution> action) {
                Action<? super Execution> effectiveAction;
                Optional<StackTraceElement[]> startTrace = ExecutionBacking.TRACE ? Optional.of(Thread.currentThread().getStackTrace()) : Optional.empty();
                Action<Execution> action2 = effectiveAction = this.registry == null ? action : Action.join(this.registry, action);
                if (this.eventLoop.inEventLoop() && DefaultExecControl.this.threadBinding.get() == null) {
                    new ExecutionBacking(DefaultExecControl.this.execController, this.eventLoop, startTrace, DefaultExecControl.this.threadBinding, effectiveAction, this.onError, this.onComplete);
                } else {
                    this.eventLoop.submit(() -> new ExecutionBacking(DefaultExecControl.this.execController, this.eventLoop, startTrace, DefaultExecControl.this.threadBinding, effectiveAction, this.onError, this.onComplete));
                }
            }
        };
    }

    @Override
    public <T> Promise<T> promise(Action<? super Fulfiller<T>> action) {
        return this.directPromise(SafeFulfiller.wrapping(this::getBacking, action));
    }

    @Override
    public <T> Promise<T> blocking(Callable<T> blockingOperation) {
        ExecutionBacking backing = this.getBacking();
        return this.directPromise(f -> backing.streamSubscribe(streamHandle -> CompletableFuture.supplyAsync(() -> {
            ArrayList holder = Lists.newArrayListWithCapacity((int)1);
            try {
                backing.intercept(ExecInterceptor.ExecType.BLOCKING, backing.getInterceptors(), () -> holder.add(0, Result.success(blockingOperation.call())));
                return (Result)holder.get(0);
            }
            catch (Exception e) {
                return Result.failure(e);
            }
        }, this.execController.getBlockingExecutor()).thenAcceptAsync(v -> streamHandle.complete(() -> f.accept(v)), (Executor)backing.getEventLoop())));
    }

    private <T> Promise<T> directPromise(Consumer<? super Fulfiller<? super T>> action) {
        return new DefaultPromise(this::getBacking, action);
    }

    @Override
    public <T> TransformablePublisher<T> stream(Publisher<T> publisher) {
        return Streams.transformable(subscriber -> {
            ExecutionBacking executionBacking = this.getBacking();
            executionBacking.streamSubscribe(handle -> publisher.subscribe(new Subscriber<T>((ExecutionBacking.StreamHandle)handle, subscriber){
                final /* synthetic */ ExecutionBacking.StreamHandle val$handle;
                final /* synthetic */ Subscriber val$cap$1;
                {
                    this.val$handle = streamHandle;
                    this.val$cap$1 = subscriber;
                }

                public void onSubscribe(Subscription subscription) {
                    this.val$handle.event(() -> this.val$cap$1.onSubscribe(subscription));
                }

                public void onNext(T element) {
                    this.val$handle.event(() -> this.val$cap$1.onNext(element));
                }

                public void onComplete() {
                    this.val$handle.complete(() -> ((Subscriber)this.val$cap$1).onComplete());
                }

                public void onError(Throwable cause) {
                    this.val$handle.complete(() -> this.val$cap$1.onError(cause));
                }
            }));
        });
    }

    static /* synthetic */ Action access$000() {
        return LOG_UNCAUGHT;
    }
}

