/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.collect.Lists;
import io.netty.channel.EventLoop;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ratpack.exec.ExecController;
import ratpack.exec.ExecInterceptor;
import ratpack.exec.Execution;
import ratpack.exec.ExecutionException;
import ratpack.exec.ExecutionSnapshot;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.InterceptedOperation;
import ratpack.func.Action;

public class ExecutionBacking {
    public static final boolean TRACE = Boolean.getBoolean("ratpack.execution.trace");
    static final Logger LOGGER = LoggerFactory.getLogger(Execution.class);
    private final long startedAt = System.currentTimeMillis();
    private final List<ExecInterceptor> interceptors = Lists.newLinkedList();
    private final List<AutoCloseable> closeables = Lists.newLinkedList();
    private final Deque<Stream> streams = new ConcurrentLinkedDeque<Stream>();
    private final Deque<Stream> suspendedStreams = new ConcurrentLinkedDeque<Stream>();
    private final EventLoop eventLoop;
    private final Action<? super Throwable> onError;
    private final Action<? super Execution> onComplete;
    private final ThreadLocal<ExecutionBacking> threadBinding;
    private final Set<ExecutionBacking> executions;
    private final AtomicInteger streaming = new AtomicInteger();
    private volatile boolean done;
    private final Execution execution;
    private Optional<StackTraceElement[]> startTrace = Optional.empty();

    public ExecutionBacking(ExecController controller, Set<ExecutionBacking> executions, EventLoop eventLoop, Optional<StackTraceElement[]> startTrace, ThreadLocal<ExecutionBacking> threadBinding, Action<? super Execution> action, Action<? super Throwable> onError, Action<? super Execution> onComplete) {
        this.executions = executions;
        this.eventLoop = eventLoop;
        this.onError = onError;
        this.onComplete = onComplete;
        this.threadBinding = threadBinding;
        this.execution = new DefaultExecution(eventLoop, controller, this.closeables);
        this.startTrace = startTrace;
        executions.add(this);
        Event event = new Event();
        event.segments.add(new UserCodeSegment(action));
        this.queueStream().events.add(event);
        this.drain();
    }

    public ExecutionSnapshot getSnapshot() {
        return new Snapshot();
    }

    public Execution getExecution() {
        return this.execution;
    }

    public EventLoop getEventLoop() {
        return this.eventLoop;
    }

    public List<ExecInterceptor> getInterceptors() {
        return this.interceptors;
    }

    private Stream queueStream() {
        Stream stream = new Stream();
        this.streams.add(stream);
        return stream;
    }

    private Stream currentStream() {
        return this.streams.peek();
    }

    private Event currentEvent() {
        return this.currentStream().events.peek();
    }

    private ExecutionSegment nextSegment() {
        return this.currentEvent().segments.peek();
    }

    public void streamSubscribe(Consumer<? super StreamHandle> runnable) {
        StreamHandle handle = new StreamHandle(this.queueStream());
        this.currentEvent().segments.add(new StreamSubscribe(handle, runnable));
        this.drain();
    }

    private void drain() {
        block11: {
            if (this.equals(this.threadBinding.get())) {
                return;
            }
            if (this.eventLoop.inEventLoop() && this.threadBinding.get() == null) {
                if (!this.hasExecutableSegments()) {
                    return;
                }
                try {
                    this.threadBinding.set(this);
                    this.assertNotDone();
                    while (true) {
                        ExecutionSegment segment;
                        if ((segment = this.nextSegment()) == null) {
                            Stream stream = this.currentStream();
                            stream.events.remove();
                            if (!stream.events.isEmpty()) continue;
                            if (this.suspendedStreams.isEmpty()) {
                                this.done();
                                return;
                            }
                            if (this.streaming.get() < this.suspendedStreams.size()) {
                                this.streams.remove();
                                this.streams.addFirst(this.suspendedStreams.removeLast());
                                continue;
                            }
                            break block11;
                        }
                        Event event = this.currentEvent();
                        event.segments.poll();
                        segment.run();
                    }
                }
                finally {
                    this.threadBinding.remove();
                }
            }
            this.eventLoop.execute(this::drain);
        }
    }

    private boolean hasExecutableSegments() {
        return this.currentEvent() != null;
    }

    private void done() {
        this.done = true;
        try {
            this.onComplete.execute(this.getExecution());
        }
        catch (Throwable e) {
            LOGGER.warn("exception raised during onComplete action", e);
        }
        for (AutoCloseable closeable : this.closeables) {
            try {
                closeable.close();
            }
            catch (Throwable e) {
                LOGGER.warn(String.format("exception raised by closeable %s", closeable), e);
            }
        }
        this.executions.remove(this);
    }

    private void assertNotDone() {
        if (this.done) {
            throw new ExecutionException("execution is complete");
        }
    }

    public void intercept(ExecInterceptor.ExecType execType, List<ExecInterceptor> interceptors, final Action<? super Execution> action) throws Exception {
        new InterceptedOperation(execType, interceptors){

            @Override
            protected void performOperation() throws Exception {
                action.execute(ExecutionBacking.this.getExecution());
            }
        }.run();
    }

    private class StreamCompletion
    extends UserCodeSegment {
        private final StreamHandle handle;

        private StreamCompletion(StreamHandle handle, Action<? super Execution> action) {
            super(action);
            this.handle = handle;
        }

        @Override
        public void run() {
            super.run();
            this.handle.streamEvent(new ExecutionSegment(){

                @Override
                public void run() {
                    ExecutionBacking.this.streaming.decrementAndGet();
                }
            });
        }
    }

    private class StreamEvent
    extends UserCodeSegment {
        private StreamEvent(Action<? super Execution> action) {
            super(action);
        }
    }

    private class StreamSubscribe
    extends ExecutionSegment {
        private final StreamHandle handle;
        private final Consumer<? super StreamHandle> consumer;

        private StreamSubscribe(StreamHandle handle, Consumer<? super StreamHandle> consumer) {
            this.handle = handle;
            this.consumer = consumer;
        }

        @Override
        public void run() {
            ExecutionBacking.this.suspendedStreams.add(ExecutionBacking.this.streams.remove());
            ExecutionBacking.this.streaming.incrementAndGet();
            this.consumer.accept(this.handle);
        }
    }

    private class UserCodeSegment
    extends ExecutionSegment {
        private final Action<? super Execution> action;

        public UserCodeSegment(Action<? super Execution> action) {
            this.action = action;
        }

        @Override
        public void run() {
            try {
                ExecutionBacking.this.intercept(ExecInterceptor.ExecType.COMPUTE, ExecutionBacking.this.interceptors, this.action);
            }
            catch (Throwable e) {
                Event event = ExecutionBacking.this.currentEvent();
                event.segments.clear();
                event.segments.addFirst(new ThrowSegment(e));
            }
        }
    }

    private class ThrowSegment
    extends ExecutionSegment {
        private final Throwable throwable;

        private ThrowSegment(Throwable throwable) {
            this.throwable = throwable;
        }

        @Override
        public void run() {
            try {
                ExecutionBacking.this.onError.execute(this.throwable);
            }
            catch (Throwable errorHandlerException) {
                ((ExecutionBacking)ExecutionBacking.this).currentEvent().segments.addFirst(new UserCodeSegment(Action.throwException(errorHandlerException)));
            }
        }
    }

    private abstract class ExecutionSegment
    implements Runnable {
        private final Optional<StackTraceElement[]> trace = TRACE ? Optional.of(Thread.currentThread().getStackTrace()) : Optional.empty();

        protected ExecutionSegment() {
        }

        public Optional<StackTraceElement[]> getTrace() {
            return this.trace;
        }
    }

    public class StreamHandle {
        private final Stream stream;

        private StreamHandle(Stream stream) {
            this.stream = stream;
        }

        public void event(Action<? super Execution> action) {
            this.streamEvent(new StreamEvent(action));
        }

        public void complete(Action<? super Execution> action) {
            this.streamEvent(new StreamCompletion(this, action));
        }

        private void streamEvent(ExecutionSegment s) {
            Event event = new Event();
            event.segments.add(s);
            this.stream.events.add(event);
            ExecutionBacking.this.drain();
        }
    }

    private class Snapshot
    implements ExecutionSnapshot {
        private final boolean waiting;

        private Snapshot() {
            this.waiting = !ExecutionBacking.this.hasExecutableSegments();
        }

        @Override
        public String getId() {
            return Integer.toString(System.identityHashCode(ExecutionBacking.this));
        }

        @Override
        public boolean getWaiting() {
            return this.waiting;
        }

        @Override
        public Long getStartedAt() {
            return ExecutionBacking.this.startedAt;
        }

        @Override
        public Optional<StackTraceElement[]> getStartedTrace() {
            return ExecutionBacking.this.startTrace;
        }
    }

    private static class Stream {
        Queue<Event> events = new ConcurrentLinkedQueue<Event>();

        private Stream() {
            this.events.add(new Event());
        }
    }

    private static class Event {
        Deque<ExecutionSegment> segments = new ConcurrentLinkedDeque<ExecutionSegment>();

        private Event() {
        }
    }
}

