/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.netty.channel.EventLoop;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ratpack.exec.Downstream;
import ratpack.exec.ExecController;
import ratpack.exec.ExecInterceptor;
import ratpack.exec.Execution;
import ratpack.exec.ExecutionException;
import ratpack.exec.OverlappingExecutionException;
import ratpack.exec.UnmanagedThreadException;
import ratpack.exec.Upstream;
import ratpack.exec.internal.DefaultExecution;
import ratpack.func.Action;
import ratpack.func.BiAction;
import ratpack.func.Block;
import ratpack.registry.RegistrySpec;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;

public class ExecutionBacking {
    static final Logger LOGGER = LoggerFactory.getLogger(Execution.class);
    public static final ThreadLocal<ExecutionBacking> THREAD_BINDING = new ThreadLocal();
    private final ImmutableList<? extends ExecInterceptor> globalInterceptors;
    private final ImmutableList<? extends ExecInterceptor> registryInterceptors;
    private List<ExecInterceptor> adhocInterceptors;
    StreamHandle streamHandle;
    private final EventLoop eventLoop;
    private final List<AutoCloseable> closeables = Lists.newArrayList();
    private final BiAction<? super Execution, ? super Throwable> onError;
    private final Action<? super Execution> onComplete;
    private volatile boolean done;
    private final Execution execution;

    public ExecutionBacking(ExecController controller, EventLoop eventLoop, ImmutableList<? extends ExecInterceptor> globalInterceptors, Action<? super RegistrySpec> registry, Action<? super Execution> action, BiAction<? super Execution, ? super Throwable> onError, Action<? super Execution> onStart, Action<? super Execution> onComplete) throws Exception {
        this.eventLoop = eventLoop;
        this.onError = onError;
        this.onComplete = onComplete;
        this.execution = new DefaultExecution(this, eventLoop, controller, this.closeables);
        registry.execute(this.execution);
        onStart.execute(this.execution);
        this.registryInterceptors = ImmutableList.copyOf(this.execution.getAll(ExecInterceptor.class));
        this.globalInterceptors = globalInterceptors;
        this.streamHandle = new InitialStreamHandle();
        ArrayDeque<UserCode> event = new ArrayDeque<UserCode>();
        event.add(() -> action.execute(this.execution));
        this.streamHandle.stream.add(event);
        this.drain();
    }

    public static ExecutionBacking get() throws UnmanagedThreadException {
        return THREAD_BINDING.get();
    }

    public static ExecutionBacking require() throws UnmanagedThreadException {
        ExecutionBacking executionBacking = ExecutionBacking.get();
        if (executionBacking == null) {
            throw new UnmanagedThreadException();
        }
        return executionBacking;
    }

    public static <T> TransformablePublisher<T> stream(Publisher<T> publisher) {
        return Streams.transformable(subscriber -> ExecutionBacking.require().streamSubscribe(handle -> publisher.subscribe(new Subscriber<T>((StreamHandle)handle, subscriber){
            final /* synthetic */ StreamHandle val$handle;
            final /* synthetic */ Subscriber val$cap$1;
            {
                this.val$handle = streamHandle;
                this.val$cap$1 = subscriber;
            }

            public void onSubscribe(Subscription subscription) {
                this.val$handle.event(() -> this.val$cap$1.onSubscribe(subscription));
            }

            public void onNext(T element) {
                this.val$handle.event(() -> this.val$cap$1.onNext(element));
            }

            public void onComplete() {
                this.val$handle.complete(() -> ((Subscriber)this.val$cap$1).onComplete());
            }

            public void onError(Throwable cause) {
                this.val$handle.complete(() -> this.val$cap$1.onError(cause));
            }
        })));
    }

    public static <T> Upstream<T> upstream(Upstream<T> upstream) {
        return downstream -> {
            final AtomicBoolean fired = new AtomicBoolean();
            ExecutionBacking.require().streamSubscribe(handle -> {
                try {
                    upstream.connect(new Downstream<T>((StreamHandle)handle, downstream){
                        final /* synthetic */ StreamHandle val$handle;
                        final /* synthetic */ Downstream val$cap$2;
                        {
                            this.val$handle = streamHandle;
                            this.val$cap$2 = downstream;
                        }

                        @Override
                        public void error(Throwable throwable) {
                            if (!fired.compareAndSet(false, true)) {
                                LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled", throwable));
                                return;
                            }
                            this.val$handle.complete(() -> this.val$cap$2.error(throwable));
                        }

                        @Override
                        public void success(T value) {
                            if (!fired.compareAndSet(false, true)) {
                                LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled"));
                                return;
                            }
                            this.val$handle.complete(() -> this.val$cap$2.success(value));
                        }

                        @Override
                        public void complete() {
                            if (!fired.compareAndSet(false, true)) {
                                LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled"));
                                return;
                            }
                            this.val$handle.complete(this.val$cap$2::complete);
                        }
                    });
                }
                catch (Throwable throwable) {
                    if (!fired.compareAndSet(false, true)) {
                        LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled", throwable));
                        return;
                    }
                    handle.complete(() -> downstream.error(throwable));
                }
            });
        };
    }

    public Execution getExecution() {
        return this.execution;
    }

    public EventLoop getEventLoop() {
        return this.eventLoop;
    }

    public void addInterceptor(ExecInterceptor interceptor) {
        if (this.adhocInterceptors == null) {
            this.adhocInterceptors = Lists.newArrayList();
        }
        this.adhocInterceptors.add(interceptor);
    }

    public void streamSubscribe(Action<? super StreamHandle> consumer) {
        if (this.done) {
            throw new ExecutionException("this execution has completed (you may be trying to use a promise in a cleanup method)");
        }
        if (this.streamHandle.stream.isEmpty()) {
            this.streamHandle.stream.add(new ArrayDeque());
        }
        this.streamHandle.stream.element().add(() -> {
            StreamHandle parent = this.streamHandle;
            this.streamHandle = new StreamHandle(parent);
            consumer.execute(this.streamHandle);
        });
        this.drain();
    }

    public void eventLoopDrain() {
        this.eventLoop.execute(this::drain);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drain() {
        if (this.done) {
            return;
        }
        ExecutionBacking threadBoundExecutionBacking = THREAD_BINDING.get();
        if (this.equals(threadBoundExecutionBacking)) {
            return;
        }
        if (!this.eventLoop.inEventLoop() || threadBoundExecutionBacking != null) {
            if (!this.done) {
                this.eventLoop.execute(this::drain);
            }
            return;
        }
        try {
            THREAD_BINDING.set(this);
            while (true) {
                if (this.streamHandle.stream.isEmpty()) {
                    return;
                }
                Block segment = this.streamHandle.stream.element().poll();
                if (segment == null) {
                    this.streamHandle.stream.remove();
                    if (!this.streamHandle.stream.isEmpty()) continue;
                    if (this.streamHandle.getClass().equals(InitialStreamHandle.class)) {
                        this.done();
                        return;
                    }
                    break;
                }
                if (segment instanceof UserCode) {
                    try {
                        this.intercept(ExecInterceptor.ExecType.COMPUTE, segment);
                    }
                    catch (Throwable e) {
                        Deque<Block> event = this.streamHandle.stream.element();
                        event.clear();
                        event.addFirst(() -> {
                            try {
                                this.onError.execute(this.execution, e);
                            }
                            catch (Throwable errorHandlerException) {
                                this.streamHandle.stream.element().addFirst(() -> {
                                    throw errorHandlerException;
                                });
                            }
                        });
                    }
                    continue;
                }
                try {
                    segment.execute();
                }
                catch (Exception e) {
                    LOGGER.error("Internal Ratpack Error - please raise an issue", (Throwable)e);
                }
            }
        }
        finally {
            THREAD_BINDING.remove();
        }
    }

    private void intercept(ExecInterceptor.ExecType execType, Block segment) throws Exception {
        Iterator<? extends ExecInterceptor> iterator = this.getAllInterceptors().iterator();
        this.intercept(execType, iterator, segment);
    }

    public Iterable<? extends ExecInterceptor> getAllInterceptors() {
        Iterable interceptors = this.adhocInterceptors == null ? Iterables.concat(this.globalInterceptors, this.registryInterceptors) : Iterables.concat(this.globalInterceptors, this.registryInterceptors, this.adhocInterceptors);
        return interceptors;
    }

    private void done() {
        this.done = true;
        try {
            this.onComplete.execute(this.getExecution());
        }
        catch (Throwable e) {
            LOGGER.warn("exception raised during onComplete action", e);
        }
        for (AutoCloseable closeable : this.closeables) {
            try {
                closeable.close();
            }
            catch (Throwable e) {
                LOGGER.warn(String.format("exception raised by closeable %s", closeable), e);
            }
        }
    }

    public void intercept(ExecInterceptor.ExecType execType, Iterator<? extends ExecInterceptor> interceptors, Block action) throws Exception {
        if (interceptors.hasNext()) {
            interceptors.next().intercept(this.execution, execType, () -> this.intercept(execType, interceptors, action));
        } else {
            action.execute();
        }
    }

    public class StreamHandle {
        final StreamHandle parent;
        final Queue<Deque<Block>> stream = new ConcurrentLinkedQueue<Deque<Block>>();

        private StreamHandle(StreamHandle parent) {
            this.parent = parent;
            this.stream.add(new ArrayDeque());
        }

        public void event(UserCode action) {
            this.streamEvent(action);
        }

        public void complete(UserCode action) {
            this.streamEvent(() -> {
                ExecutionBacking.this.streamHandle = this.parent;
                action.execute();
            });
        }

        public void complete() {
            this.streamEvent(() -> {
                ExecutionBacking.this.streamHandle = this.parent;
            });
        }

        private void streamEvent(Block s) {
            ArrayDeque<Block> event = new ArrayDeque<Block>();
            event.add(s);
            this.stream.add(event);
            ExecutionBacking.this.drain();
        }
    }

    public static interface UserCode
    extends Block {
    }

    private class InitialStreamHandle
    extends StreamHandle {
        public InitialStreamHandle() {
            super(null);
        }
    }
}

