/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing;

import java.util.function.Consumer;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.el.BindingContextUtils;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.meta.AnnotatedObject;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.el.ExtendedExpressionManager;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.processor.AbstractMuleObjectOwner;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Router;
import org.mule.runtime.core.api.processor.strategy.DirectProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.routing.ForkJoinStrategy;
import org.mule.runtime.core.api.routing.ForkJoinStrategyFactory;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.internal.component.ComponentUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public abstract class AbstractForkJoinRouter
extends AbstractMuleObjectOwner<MessageProcessorChain>
implements Router {
    @Inject
    private SchedulerService schedulerService;
    @Inject
    private ConfigurationComponentLocator componentLocator;
    private ProcessingStrategy processingStrategy;
    private ForkJoinStrategyFactory forkJoinStrategyFactory;
    private ForkJoinStrategy forkJoinStrategy;
    private long timeout = Long.MAX_VALUE;
    private Integer maxConcurrency;
    private Scheduler timeoutScheduler;
    private ErrorType timeoutErrorType;
    private ExtendedExpressionManager expressionManager;
    private String target;
    private String targetValue = "#[payload]";

    @Override
    public InternalEvent process(InternalEvent event) throws MuleException {
        return MessageProcessors.processToApply(event, this);
    }

    @Override
    public Publisher<InternalEvent> apply(Publisher<InternalEvent> publisher) {
        return Flux.from(publisher).doOnNext(this.onEvent()).flatMap(event -> Flux.from(this.forkJoinStrategy.forkJoin((InternalEvent)event, this.getRoutingPairs((InternalEvent)event))).map(result -> {
            if (this.target != null) {
                TypedValue targetValue = this.getTargetValue((InternalEvent)result);
                return InternalEvent.builder(event).addVariable(this.target, targetValue.getValue(), targetValue.getDataType()).build();
            }
            return result;
        }).onErrorMap(throwable -> !(throwable instanceof MessagingException), throwable -> new MessagingException((InternalEvent)event, (Throwable)throwable, (AnnotatedObject)this)));
    }

    protected Consumer<InternalEvent> onEvent() {
        return event -> {};
    }

    protected abstract Publisher<ForkJoinStrategy.RoutingPair> getRoutingPairs(InternalEvent var1);

    @Override
    public void initialise() throws InitialisationException {
        super.initialise();
        this.processingStrategy = ComponentUtils.getFromAnnotatedObject(this.componentLocator, this).map(flow -> flow.getProcessingStrategy()).orElse(DirectProcessingStrategyFactory.DIRECT_PROCESSING_STRATEGY_INSTANCE);
        this.expressionManager = this.muleContext.getExpressionManager();
        this.timeoutScheduler = this.schedulerService.cpuLightScheduler();
        this.timeoutErrorType = this.muleContext.getErrorTypeRepository().getErrorType(Errors.ComponentIdentifiers.Handleable.TIMEOUT).get();
        this.maxConcurrency = this.maxConcurrency != null ? this.maxConcurrency.intValue() : this.getDefaultMaxConcurrency();
        this.forkJoinStrategyFactory = this.forkJoinStrategyFactory != null ? this.forkJoinStrategyFactory : this.getDefaultForkJoinStrategyFactory();
        this.forkJoinStrategy = this.forkJoinStrategyFactory.createForkJoinStrategy(this.processingStrategy, this.maxConcurrency, this.isDelayErrors(), this.timeout, this.timeoutScheduler, this.timeoutErrorType);
    }

    @Override
    public void dispose() {
        if (this.timeoutScheduler != null) {
            this.timeoutScheduler.stop();
        }
        super.dispose();
    }

    public void setForkJoinStrategyFactory(ForkJoinStrategyFactory forkJoinStrategyFactory) {
        this.forkJoinStrategyFactory = forkJoinStrategyFactory;
    }

    public void setTimeout(long timeout) {
        Preconditions.checkArgument(timeout > 0L, "Timeout must be greater than zero");
        this.timeout = timeout;
    }

    public void setMaxConcurrency(int maxConcurrency) {
        Preconditions.checkArgument(this.timeout > 0L, "Maximum concurrency must be one or more.");
        this.maxConcurrency = maxConcurrency;
    }

    public void setTarget(String target) {
        this.target = target;
    }

    public void setTargetValue(String targetValue) {
        this.targetValue = targetValue;
    }

    protected abstract int getDefaultMaxConcurrency();

    protected abstract boolean isDelayErrors();

    protected abstract ForkJoinStrategyFactory getDefaultForkJoinStrategyFactory();

    private TypedValue getTargetValue(InternalEvent event) {
        return this.muleContext.getExpressionManager().evaluate(this.targetValue, BindingContextUtils.getTargetBindingContext(event.getMessage()));
    }
}

