/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ratpack.exec.Downstream;
import ratpack.exec.ExecController;
import ratpack.exec.ExecInitializer;
import ratpack.exec.ExecInterceptor;
import ratpack.exec.Execution;
import ratpack.exec.ExecutionException;
import ratpack.exec.OverlappingExecutionException;
import ratpack.exec.UnmanagedThreadException;
import ratpack.exec.Upstream;
import ratpack.exec.internal.Continuation;
import ratpack.exec.internal.ContinuationStream;
import ratpack.exec.internal.ExecControllerInternal;
import ratpack.func.Action;
import ratpack.func.Block;
import ratpack.registry.MutableRegistry;
import ratpack.registry.NotInRegistryException;
import ratpack.registry.RegistrySpec;
import ratpack.registry.internal.SimpleMutableRegistry;
import ratpack.stream.TransformablePublisher;

public class DefaultExecution
implements Execution {
    public static final Logger LOGGER = LoggerFactory.getLogger(Execution.class);
    public static final FastThreadLocal<DefaultExecution> THREAD_BINDING = new FastThreadLocal();
    private ExecStream execStream;
    private final ExecControllerInternal controller;
    private final EventLoop eventLoop;
    private final Action<? super Throwable> onError;
    private final Action<? super Execution> onComplete;
    private List<AutoCloseable> closeables;
    private final MutableRegistry registry = new SimpleMutableRegistry();
    private List<ExecInterceptor> adhocInterceptors;
    private Iterable<? extends ExecInterceptor> interceptors;

    public DefaultExecution(ExecControllerInternal controller, EventLoop eventLoop, Action<? super RegistrySpec> registryInit, Action<? super Execution> action, Action<? super Throwable> onError, Action<? super Execution> onStart, Action<? super Execution> onComplete) throws Exception {
        this.controller = controller;
        this.eventLoop = eventLoop;
        this.onError = onError;
        this.onComplete = onComplete;
        registryInit.execute(this.registry);
        onStart.execute(this);
        this.execStream = new InitialExecStream(action);
        this.interceptors = Iterables.concat(controller.getInterceptors(), (Iterable)ImmutableList.copyOf(this.registry.getAll(ExecInterceptor.class)));
        for (ExecInitializer initializer : controller.getInitializers()) {
            initializer.init(this);
        }
        for (ExecInitializer initializer : this.registry.getAll(ExecInitializer.class)) {
            initializer.init(this);
        }
        this.drain();
    }

    public static DefaultExecution get() throws UnmanagedThreadException {
        return (DefaultExecution)THREAD_BINDING.get();
    }

    public static DefaultExecution require() throws UnmanagedThreadException {
        DefaultExecution executionBacking = DefaultExecution.get();
        if (executionBacking == null) {
            throw new UnmanagedThreadException();
        }
        return executionBacking;
    }

    public static <T> TransformablePublisher<T> stream(Publisher<T> publisher, Action<? super T> disposer) {
        return publisher instanceof ExecutionBoundPublisher ? (TransformablePublisher)publisher : new ExecutionBoundPublisher(publisher, disposer);
    }

    public static <T> Upstream<T> upstream(Upstream<T> upstream) {
        return downstream -> {
            final AtomicBoolean fired = new AtomicBoolean();
            DefaultExecution.require().delimit(downstream::error, continuation -> {
                try {
                    upstream.connect(new Downstream<T>((Continuation)continuation, downstream){
                        final /* synthetic */ Continuation val$continuation;
                        final /* synthetic */ Downstream val$downstream;
                        {
                            this.val$continuation = continuation;
                            this.val$downstream = downstream;
                        }

                        @Override
                        public void error(Throwable throwable) {
                            if (!fired.compareAndSet(false, true)) {
                                LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled", throwable));
                                return;
                            }
                            this.val$continuation.resume(() -> this.val$downstream.error(throwable));
                        }

                        @Override
                        public void success(T value) {
                            if (!fired.compareAndSet(false, true)) {
                                LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled"));
                                return;
                            }
                            this.val$continuation.resume(() -> this.val$downstream.success(value));
                        }

                        @Override
                        public void complete() {
                            if (!fired.compareAndSet(false, true)) {
                                LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled"));
                                return;
                            }
                            this.val$continuation.resume(this.val$downstream::complete);
                        }
                    });
                }
                catch (Throwable throwable) {
                    if (!fired.compareAndSet(false, true)) {
                        LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled", throwable));
                        return;
                    }
                    continuation.resume(() -> downstream.error(throwable));
                }
            });
        };
    }

    @Override
    public EventLoop getEventLoop() {
        return this.eventLoop;
    }

    public void delimit(Action<? super Throwable> onError, Action<? super Continuation> segment) {
        this.execStream.enqueue(() -> {
            this.execStream = new SingleEventExecStream(this.execStream, onError, segment);
        });
        this.drain();
    }

    private void delimitStream(Action<? super Throwable> onError, Action<? super ContinuationStream> segment) {
        this.execStream.enqueue(() -> {
            this.execStream = new MultiEventExecStream(this.execStream, onError, segment);
        });
        this.drain();
    }

    public void eventLoopDrain() {
        this.eventLoop.execute(this::drain);
    }

    private void drain() {
        if (this.execStream == TerminalExecStream.INSTANCE) {
            return;
        }
        DefaultExecution currentExecution = (DefaultExecution)THREAD_BINDING.get();
        if (this == currentExecution) {
            return;
        }
        if (!this.eventLoop.inEventLoop() || currentExecution != null) {
            this.eventLoopDrain();
            return;
        }
        try {
            THREAD_BINDING.set((Object)this);
            this.intercept(this.interceptors.iterator());
        }
        catch (Throwable e) {
            DefaultExecution.interceptorError(e);
        }
        finally {
            THREAD_BINDING.remove();
        }
    }

    public static void interceptorError(Throwable e) {
        LOGGER.warn("exception was thrown by an execution interceptor (which will be ignored):", e);
    }

    public Iterable<? extends ExecInterceptor> getAllInterceptors() {
        return this.interceptors;
    }

    private void intercept(Iterator<? extends ExecInterceptor> interceptors) throws Exception {
        if (interceptors.hasNext()) {
            interceptors.next().intercept(this, ExecInterceptor.ExecType.COMPUTE, () -> this.intercept(interceptors));
        } else {
            this.exec();
        }
    }

    private void exec() {
        while (true) {
            try {
                while (this.execStream.exec()) {
                }
            }
            catch (Throwable segmentError) {
                this.execStream.error(segmentError);
                continue;
            }
            break;
        }
        if (this.execStream == TerminalExecStream.INSTANCE) {
            try {
                this.onComplete.execute(this);
            }
            catch (Throwable e) {
                LOGGER.warn("exception raised during onComplete action", e);
            }
            if (this.closeables != null) {
                for (AutoCloseable closeable : this.closeables) {
                    try {
                        closeable.close();
                    }
                    catch (Throwable e) {
                        LOGGER.warn("exception raised by execution closeable " + closeable, e);
                    }
                }
            }
        }
    }

    @Override
    public ExecController getController() {
        return this.controller;
    }

    @Override
    public void onComplete(AutoCloseable closeable) {
        if (this.closeables == null) {
            this.closeables = Lists.newArrayList();
        }
        this.closeables.add(closeable);
    }

    @Override
    public <O> Execution addLazy(TypeToken<O> type, Supplier<? extends O> supplier) {
        this.registry.addLazy(type, supplier);
        return this;
    }

    @Override
    public void addInterceptor(ExecInterceptor execInterceptor, Block continuation) throws Exception {
        if (this.adhocInterceptors == null) {
            this.adhocInterceptors = Lists.newArrayList();
            this.interceptors = Iterables.concat(this.interceptors, this.adhocInterceptors);
        }
        this.adhocInterceptors.add(execInterceptor);
        execInterceptor.intercept(this, ExecInterceptor.ExecType.COMPUTE, continuation);
    }

    @Override
    public <T> void remove(TypeToken<T> type) throws NotInRegistryException {
        this.registry.remove(type);
    }

    @Override
    public <O> Optional<O> maybeGet(TypeToken<O> type) {
        return this.registry.maybeGet(type);
    }

    @Override
    public <O> Iterable<? extends O> getAll(TypeToken<O> type) {
        return this.registry.getAll(type);
    }

    private class MultiEventExecStream
    extends ExecStream
    implements ContinuationStream {
        final ExecStream parent;
        private final Action<? super Throwable> onError;
        final Queue<Queue<Block>> events;
        private final AtomicReference<Block> complete;

        MultiEventExecStream(ExecStream parent, Action<? super Throwable> onError, Action<? super ContinuationStream> initial) {
            this.events = PlatformDependent.newMpscQueue();
            this.complete = new AtomicReference();
            this.parent = parent;
            this.onError = onError;
            this.event(() -> initial.execute(this));
        }

        @Override
        public boolean event(Block action) {
            if (this.complete.get() == null) {
                ArrayDeque<Block> event = new ArrayDeque<Block>();
                event.add(action);
                this.events.add(event);
                DefaultExecution.this.drain();
                return true;
            }
            return false;
        }

        @Override
        public boolean complete(Block action) {
            if (this.complete.compareAndSet(null, action)) {
                DefaultExecution.this.drain();
                return true;
            }
            return false;
        }

        @Override
        boolean exec() throws Exception {
            Block nextSegment = this.events.peek().poll();
            if (nextSegment == null) {
                if (this.events.size() == 1) {
                    if (this.complete.get() == null) {
                        return false;
                    }
                    DefaultExecution.this.execStream = this.parent;
                    this.complete.get().execute();
                    return true;
                }
                this.events.poll();
                return true;
            }
            nextSegment.execute();
            return true;
        }

        @Override
        void enqueue(Block segment) {
            this.events.peek().add(segment);
        }

        @Override
        void error(Throwable throwable) {
            DefaultExecution.this.execStream = this.parent;
            try {
                this.onError.execute(throwable);
            }
            catch (Exception e) {
                DefaultExecution.this.execStream.error(e);
            }
        }
    }

    private class SingleEventExecStream
    extends ExecStream
    implements Continuation {
        final ExecStream parent;
        private final Action<? super Throwable> onError;
        Action<? super Continuation> initial;
        Block resume;
        boolean resumed;
        Queue<Block> segments;

        SingleEventExecStream(ExecStream parent, Action<? super Throwable> onError, Action<? super Continuation> initial) {
            this.parent = parent;
            this.onError = onError;
            this.initial = initial;
        }

        @Override
        boolean exec() throws Exception {
            if (this.initial == null) {
                if (this.segments == null || this.segments.isEmpty()) {
                    if (this.resume == null) {
                        if (this.resumed) {
                            DefaultExecution.this.execStream = this.parent;
                            return true;
                        }
                        return false;
                    }
                    this.resume.execute();
                    this.resume = null;
                    return true;
                }
                Block segment = this.segments.poll();
                if (segment == null) {
                    DefaultExecution.this.execStream = this.parent;
                    return true;
                }
                segment.execute();
                return true;
            }
            this.initial.execute(this);
            this.initial = null;
            return true;
        }

        @Override
        void enqueue(Block segment) {
            if (this.segments == null) {
                this.segments = new ArrayDeque<Block>(1);
            }
            this.segments.add(segment);
        }

        @Override
        public void resume(Block action) {
            this.resumed = true;
            this.resume = action;
            DefaultExecution.this.drain();
        }

        @Override
        void error(Throwable throwable) {
            DefaultExecution.this.execStream = this.parent;
            if (this.resumed && this.resume == null) {
                this.parent.error(throwable);
            } else {
                try {
                    this.onError.execute(throwable);
                }
                catch (Throwable e) {
                    DefaultExecution.this.execStream.error(e);
                }
            }
        }
    }

    private class InitialExecStream
    extends ExecStream {
        Action<? super Execution> initial;
        Queue<Block> segments;

        InitialExecStream(Action<? super Execution> initial) {
            this.initial = initial;
        }

        @Override
        boolean exec() throws Exception {
            if (this.initial == null) {
                if (this.segments == null) {
                    DefaultExecution.this.execStream = TerminalExecStream.INSTANCE;
                    return false;
                }
                Block segment = this.segments.poll();
                if (segment == null) {
                    DefaultExecution.this.execStream = TerminalExecStream.INSTANCE;
                    return false;
                }
                segment.execute();
                return true;
            }
            this.initial.execute(DefaultExecution.this);
            this.initial = null;
            return true;
        }

        @Override
        void enqueue(Block segment) {
            if (this.segments == null) {
                this.segments = new ArrayDeque<Block>(1);
            }
            this.segments.add(segment);
        }

        @Override
        void error(Throwable throwable) {
            this.initial = null;
            if (this.segments != null) {
                this.segments.clear();
            }
            try {
                DefaultExecution.this.onError.execute(throwable);
            }
            catch (Throwable errorHandlerError) {
                LOGGER.error("error handler " + DefaultExecution.this.onError + " threw error (this execution will terminate):", errorHandlerError);
                DefaultExecution.this.execStream = TerminalExecStream.INSTANCE;
            }
        }
    }

    private static class ExecutionBoundPublisher<T>
    implements TransformablePublisher<T> {
        private final Publisher<T> publisher;
        private final Action<? super T> disposer;

        private ExecutionBoundPublisher(Publisher<T> publisher, Action<? super T> disposer) {
            this.publisher = publisher;
            this.disposer = disposer;
        }

        public void subscribe(Subscriber<? super T> subscriber) {
            DefaultExecution.require().delimitStream(arg_0 -> subscriber.onError(arg_0), continuation -> this.publisher.subscribe(new Subscriber<T>((ContinuationStream)continuation, (Subscriber)subscriber){
                private final AtomicBoolean cancelled = new AtomicBoolean();
                final /* synthetic */ ContinuationStream val$continuation;
                final /* synthetic */ Subscriber val$subscriber;
                {
                    this.val$continuation = continuationStream;
                    this.val$subscriber = subscriber;
                }

                public void onSubscribe(final Subscription subscription) {
                    this.val$continuation.event(() -> this.val$subscriber.onSubscribe(new Subscription(){

                        public void request(long n) {
                            subscription.request(n);
                        }

                        public void cancel() {
                            cancelled.set(true);
                            subscription.cancel();
                            val$continuation.complete(Block.noop());
                        }
                    }));
                }

                public void onNext(T element) {
                    boolean added = this.val$continuation.event(() -> {
                        if (this.cancelled.get()) {
                            this.dispose(element);
                        } else {
                            this.val$subscriber.onNext(element);
                        }
                    });
                    if (!added) {
                        this.dispose(element);
                    }
                }

                private void dispose(T element) {
                    try {
                        disposer.execute(element);
                    }
                    catch (Exception e) {
                        LOGGER.warn("Exception raised disposing stream item will be ignored - ", (Throwable)e);
                    }
                }

                public void onComplete() {
                    this.val$continuation.complete(() -> {
                        if (!this.cancelled.get()) {
                            this.val$subscriber.onComplete();
                        }
                    });
                }

                public void onError(Throwable cause) {
                    if (!this.cancelled.get()) {
                        this.val$continuation.complete(() -> this.val$subscriber.onError(cause));
                    }
                }
            }));
        }
    }

    private static class TerminalExecStream
    extends ExecStream {
        static final ExecStream INSTANCE = new TerminalExecStream();

        private TerminalExecStream() {
        }

        @Override
        boolean exec() {
            return false;
        }

        @Override
        void enqueue(Block segment) {
            throw new ExecutionException("this execution has completed (you may be trying to use a promise in a cleanup method)");
        }

        @Override
        void error(Throwable throwable) {
            throw new ExecutionException("this execution has completed (you may be trying to use a promise in a cleanup method)");
        }
    }

    private static abstract class ExecStream {
        private ExecStream() {
        }

        abstract boolean exec() throws Exception;

        abstract void enqueue(Block var1);

        abstract void error(Throwable var1);
    }
}

