/*
 * Decompiled with CFR 0.152.
 */
package com.simplj.flows.steps;

import com.simplj.flows.core.AbstractStep;
import com.simplj.flows.core.ExecutionContext;
import com.simplj.flows.core.ExecutionResult;
import com.simplj.flows.core.Session;
import com.simplj.flows.steps.ForkStep;
import com.simplj.flows.steps.Step;
import com.simplj.flows.steps.ThreadPool;
import com.simplj.flows.steps.ThreadPoolFactory;
import com.simplj.lambda.executable.Executable;
import com.simplj.lambda.function.Consumer;
import com.simplj.lambda.function.Function;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;

public final class ParallelStep<I, O>
extends ForkStep<I, O, ParallelStep<I, O>> {
    private final ThreadPoolFactory factory;

    ParallelStep(String name, Executable<I, O> executable, ThreadPoolFactory factory) {
        this(name, Step.lift(name + "-part#1", executable), factory);
    }

    ParallelStep(String name, AbstractStep<I, O> step, ThreadPoolFactory factory) {
        super(name);
        this.factory = factory;
        this.add(step);
    }

    ParallelStep(String name, int counter, List<AbstractStep<I, O>> steps, ThreadPoolFactory factory) {
        super(name, counter, steps);
        this.factory = factory;
    }

    @Override
    final ParallelStep<I, O> instance() {
        return this;
    }

    @Override
    final List<ExecutionResult<O>> executeSteps(ExecutionContext ctx, I inp, List<AbstractStep<I, O>> steps) {
        LinkedList fs = new LinkedList();
        Session sess = ParallelStep.session();
        ThreadPool threadPool = this.factory.newThreadPool();
        ExecutorService es = threadPool.getExecutorService();
        for (AbstractStep step : steps) {
            fs.add(es.submit(() -> ParallelStep.execute(step, inp, ParallelStep.copy(ctx, sess))));
        }
        Function ctxMerger = r -> ParallelStep.executionResult(r.result(), r.error(), ParallelStep.merge(ctx, r.executionContext(), true));
        Consumer logger = s -> ParallelStep.log(ctx, s);
        List res = ThreadPool.waitForCompletion(fs, ctxMerger, (Consumer<String>)logger);
        threadPool.dispose((Consumer<String>)logger);
        return res;
    }

    @Override
    ForkStep<I, O, ParallelStep<I, O>> construct(String name, int counter, List<AbstractStep<I, O>> steps) {
        return new ParallelStep<I, O>(name, counter, steps, this.factory);
    }
}

