/*
 * Decompiled with CFR 0.152.
 */
package com.simplj.flows.steps;

import com.simplj.flows.core.AbstractStep;
import com.simplj.flows.core.ExecutionContext;
import com.simplj.flows.core.ExecutionResult;
import com.simplj.flows.core.Input;
import com.simplj.flows.core.Session;
import com.simplj.flows.exceptions.AsyncTimeoutException;
import com.simplj.flows.exceptions.SjfException;
import com.simplj.flows.steps.Step;
import com.simplj.lambda.function.Condition;
import com.simplj.lambda.function.Consumer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public final class AsyncExecutorStep<I, O>
extends Step<I, O> {
    private final AbstractStep<I, O> step;
    private final ExecutorService es;
    private final long maxWaitMillis;
    private final long intervalMillis;
    private Consumer<AsyncTimeoutException> timeoutConsumer;

    AsyncExecutorStep(AbstractStep<I, O> step, ExecutorService es) {
        this(step, es, -1L, 0L);
    }

    AsyncExecutorStep(AbstractStep<I, O> step, ExecutorService es, long maxWaitMillis, long intervalMillis) {
        super(AsyncExecutorStep.name(step));
        this.step = step;
        this.es = es;
        this.maxWaitMillis = maxWaitMillis;
        this.intervalMillis = intervalMillis;
    }

    public final Step<I, O> onTimeout(Consumer<AsyncTimeoutException> timeoutHandler) {
        this.timeoutConsumer = timeoutHandler;
        return this;
    }

    @Override
    protected ExecutionResult<O> execute(ExecutionContext ctx, I input) {
        ExecutionResult res;
        Input inp = Input.of(input);
        long ss = System.currentTimeMillis();
        try {
            Future<ExecutionResult> future;
            Session sess = AsyncExecutorStep.session();
            Callable<ExecutionResult> callable = () -> AsyncExecutorStep.execute(this.step, input, AsyncExecutorStep.copy(ctx, sess));
            if (this.es == null) {
                FutureTask<ExecutionResult> task = new FutureTask<ExecutionResult>(callable);
                new Thread(task).start();
                future = task;
            } else {
                future = this.es.submit(callable);
            }
            long s = System.currentTimeMillis();
            AsyncExecutorStep.conditionalPause(Condition.negate(Future::isDone), future, this.maxWaitMillis, this.intervalMillis);
            if (future.isDone()) {
                res = future.get();
            } else {
                future.cancel(true);
                long d = System.currentTimeMillis() - s;
                AsyncTimeoutException e = new AsyncTimeoutException(this.name(), d);
                AsyncExecutorStep.addFrame(ctx, AsyncExecutorStep.stepFrame(this.name(), "Asynchronous execution could not complete after " + d + " millis!", d));
                if (this.timeoutConsumer != null) {
                    s = System.currentTimeMillis();
                    this.timeoutConsumer.consume((Object)e);
                    AsyncExecutorStep.addFrame(ctx, AsyncExecutorStep.stepFrame(this.name() + "-timeout-handler", "TimeOut handler for step " + this.name() + " executed!", System.currentTimeMillis() - s));
                }
                res = AsyncExecutorStep.error(e, ctx);
            }
        }
        catch (Exception ex) {
            res = AsyncExecutorStep.error(ex, AsyncExecutorStep.addFrame(ctx, AsyncExecutorStep.errStepFrame(this.name(), inp, new SjfException("Failed to execute async step: " + this.name(), ex), System.currentTimeMillis() - ss)));
        }
        return res;
    }
}

