/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.ExecControl;
import ratpack.exec.ExecController;
import ratpack.exec.ExecInterceptor;
import ratpack.exec.Execution;
import ratpack.exec.ExecutionException;
import ratpack.exec.Fulfiller;
import ratpack.exec.Promise;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.internal.ExecutionBacking;
import ratpack.func.Action;
import ratpack.func.Actions;
import ratpack.func.Factory;

public class DefaultExecControl
implements ExecControl {
    private final ExecController execController;
    private final ThreadLocal<ExecutionBacking> threadBinding = new ThreadLocal();
    private final Factory<ExecutionBacking> executionBackingFactory = new Factory<ExecutionBacking>(){

        @Override
        public ExecutionBacking create() {
            return DefaultExecControl.this.getBacking();
        }
    };

    public DefaultExecControl(ExecController execController) {
        this.execController = execController;
    }

    private ExecutionBacking getBacking() {
        ExecutionBacking executionBacking = this.threadBinding.get();
        if (executionBacking == null) {
            throw new ExecutionException("Current thread has no bound execution");
        }
        return executionBacking;
    }

    @Override
    public Execution getExecution() {
        return this.getBacking().getExecution();
    }

    @Override
    public ExecController getController() {
        return this.execController;
    }

    @Override
    public void addInterceptor(ExecInterceptor execInterceptor, Action<? super Execution> continuation) throws Exception {
        ExecutionBacking backing = this.getBacking();
        backing.getInterceptors().add(execInterceptor);
        backing.intercept(ExecInterceptor.ExecType.COMPUTE, Collections.singletonList(execInterceptor), continuation);
    }

    @Override
    public <T> Promise<T> blocking(final Callable<T> blockingOperation) {
        final ExecutionBacking backing = this.getBacking();
        final ExecController controller = backing.getController();
        return this.promise(new Action<Fulfiller<? super T>>(){

            @Override
            public void execute(Fulfiller<? super T> fulfiller) throws Exception {
                ListenableFuture future = controller.getBlockingExecutor().submit((Callable)new BlockingOperation());
                Futures.addCallback((ListenableFuture)future, (FutureCallback)new ComputeResume(fulfiller), (Executor)controller.getExecutor());
            }

            class ComputeResume
            implements FutureCallback<T> {
                private final Fulfiller<? super T> fulfiller;

                public ComputeResume(Fulfiller<? super T> fulfiller) {
                    this.fulfiller = fulfiller;
                }

                public void onSuccess(T result) {
                    this.fulfiller.success(result);
                }

                public void onFailure(Throwable t) {
                    this.fulfiller.error(t);
                }
            }

            class BlockingOperation
            implements Callable<T> {
                private Exception exception;
                private T result;

                BlockingOperation() {
                }

                @Override
                public T call() throws Exception {
                    backing.intercept(ExecInterceptor.ExecType.BLOCKING, backing.getInterceptors(), (Action<? super Execution>)new Action<Execution>(){

                        @Override
                        public void execute(Execution execution) throws Exception {
                            try {
                                BlockingOperation.this.result = blockingOperation.call();
                            }
                            catch (Exception e) {
                                BlockingOperation.this.exception = e;
                            }
                        }
                    });
                    if (this.exception != null) {
                        throw this.exception;
                    }
                    return this.result;
                }
            }
        });
    }

    @Override
    public <T> Promise<T> promise(Action<? super Fulfiller<T>> action) {
        return new DefaultPromise(this.executionBackingFactory, action);
    }

    @Override
    public void fork(Action<? super Execution> action) {
        this.fork(action, Actions.throwException(), Actions.noop());
    }

    @Override
    public void fork(Action<? super Execution> action, Action<? super Throwable> onError) {
        this.fork(action, onError, Actions.noop());
    }

    @Override
    public void fork(final Action<? super Execution> action, final Action<? super Throwable> onError, final Action<? super Execution> onComplete) {
        if (this.execController.isManagedThread() && this.threadBinding.get() == null) {
            new ExecutionBacking(this.execController, this.threadBinding, action, onError, onComplete);
        } else {
            this.execController.getExecutor().submit(new Runnable(){

                @Override
                public void run() {
                    new ExecutionBacking(DefaultExecControl.this.execController, DefaultExecControl.this.threadBinding, action, onError, onComplete);
                }
            });
        }
    }

    @Override
    public <T> void stream(Publisher<T> publisher, final Subscriber<? super T> subscriber) {
        final ExecutionBacking executionBacking = this.getBacking();
        publisher.subscribe(new Subscriber<T>(){

            public void onSubscribe(final Subscription subscription) {
                executionBacking.streamExecution((Action<? super Execution>)new Action<Execution>(){

                    @Override
                    public void execute(Execution execution) throws Exception {
                        subscriber.onSubscribe(subscription);
                    }
                });
            }

            public void onNext(final T element) {
                executionBacking.streamExecution((Action<? super Execution>)new Action<Execution>(){

                    @Override
                    public void execute(Execution execution) throws Exception {
                        subscriber.onNext(element);
                    }
                });
            }

            public void onComplete() {
                executionBacking.completeStreamExecution((Action<? super Execution>)new Action<Execution>(){

                    @Override
                    public void execute(Execution execution) throws Exception {
                        subscriber.onComplete();
                    }
                });
            }

            public void onError(final Throwable cause) {
                executionBacking.completeStreamExecution((Action<? super Execution>)new Action<Execution>(){

                    @Override
                    public void execute(Execution execution) throws Exception {
                        subscriber.onError(cause);
                    }
                });
            }
        });
    }
}

