/*
 * Decompiled with CFR 0.152.
 */
package com.simplj.flows.steps;

import com.simplj.flows.core.AbstractStep;
import com.simplj.flows.core.ExecutionContext;
import com.simplj.flows.core.ExecutionResult;
import com.simplj.flows.core.Session;
import com.simplj.flows.steps.IterableStep;
import com.simplj.flows.steps.ThreadPool;
import com.simplj.flows.steps.ThreadPoolFactory;
import com.simplj.lambda.function.Consumer;
import com.simplj.lambda.function.Function;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;

public final class ParallelIteratorStep<I, O>
extends IterableStep<I, O> {
    private final ThreadPoolFactory factory;

    ParallelIteratorStep(String name, AbstractStep<I, O> step, ThreadPoolFactory factory) {
        super(name, step);
        this.factory = factory;
    }

    @Override
    List<ExecutionResult<O>> iterateSteps(ExecutionContext ctx, Iterable<I> inputs, AbstractStep<I, O> step) {
        LinkedList fs = new LinkedList();
        Session sess = ParallelIteratorStep.session();
        ThreadPool threadPool = this.factory.newThreadPool();
        ExecutorService es = threadPool.getExecutorService();
        for (Object input : inputs) {
            fs.add(es.submit(() -> ParallelIteratorStep.execute(step, input, ParallelIteratorStep.copy(ctx, sess))));
        }
        Function ctxMerger = r -> ParallelIteratorStep.executionResult(r.result(), r.error(), ParallelIteratorStep.merge(ctx, r.executionContext(), true));
        Consumer logger = s -> ParallelIteratorStep.log(ctx, s);
        List res = ThreadPool.waitForCompletion(fs, ctxMerger, (Consumer<String>)logger);
        threadPool.dispose((Consumer<String>)logger);
        return res;
    }

    @Override
    IterableStep<I, O> construct(String name, AbstractStep<I, O> step) {
        return new ParallelIteratorStep<I, O>(name, step, this.factory);
    }
}

