/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.AsyncProcessingStrategyFactory;
import org.mule.runtime.core.internal.routing.AbstractForkJoinRouter;
import org.mule.runtime.core.internal.routing.ExpressionSplittingStrategy;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
import org.mule.runtime.core.internal.routing.ForkJoinStrategyFactory;
import org.mule.runtime.core.internal.routing.SplittingStrategy;
import org.mule.runtime.core.internal.routing.forkjoin.CollectListForkJoinStrategyFactory;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class ParallelForEach
extends AbstractForkJoinRouter {
    private String collectionExpression = "#[payload]";
    private SplittingStrategy<CoreEvent, Iterator<TypedValue<?>>> splittingStrategy;
    private List<Processor> messageProcessors;
    private MessageProcessorChain nestedChain;

    @Override
    public void initialise() throws InitialisationException {
        this.nestedChain = MessageProcessors.newChain(Optional.of(this.resolveProcessingStrategy()), this.messageProcessors);
        this.nestedChain.setMuleContext(this.muleContext);
        this.splittingStrategy = new ExpressionSplittingStrategy(this.muleContext.getExpressionManager(), this.collectionExpression);
        super.initialise();
    }

    @Override
    protected Publisher<ForkJoinStrategy.RoutingPair> getRoutingPairs(CoreEvent event) {
        return Flux.fromIterable(() -> this.splittingStrategy.split(event)).map(partTypedValue -> CoreEvent.builder(event).message(Message.builder().payload((TypedValue<?>)partTypedValue).build()).build()).map(partEvent -> ForkJoinStrategy.RoutingPair.of(partEvent, this.nestedChain));
    }

    @Override
    protected List<MessageProcessorChain> getOwnedObjects() {
        return Collections.singletonList(this.nestedChain);
    }

    public void setMessageProcessors(List<Processor> messageProcessors) {
        this.messageProcessors = messageProcessors;
    }

    @Override
    protected boolean isDelayErrors() {
        return true;
    }

    @Override
    protected int getDefaultMaxConcurrency() {
        return AsyncProcessingStrategyFactory.DEFAULT_MAX_CONCURRENCY;
    }

    @Override
    protected ForkJoinStrategyFactory getDefaultForkJoinStrategyFactory() {
        return new CollectListForkJoinStrategyFactory(false);
    }

    public void setCollectionExpression(String collectionExpression) {
        this.collectionExpression = collectionExpression;
    }
}

