/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing;

import jakarta.inject.Inject;
import java.util.function.Consumer;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.config.MuleRuntimeFeature;
import org.mule.runtime.api.el.CompiledExpression;
import org.mule.runtime.api.exception.ErrorTypeRepository;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.api.el.ExtendedExpressionManager;
import org.mule.runtime.core.api.error.Errors;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.AbstractMuleObjectOwner;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.component.ComponentUtils;
import org.mule.runtime.core.internal.el.ExpressionLanguageUtils;
import org.mule.runtime.core.internal.processor.strategy.DirectProcessingStrategyFactory;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
import org.mule.runtime.core.internal.routing.ForkJoinStrategyFactory;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.Scope;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public abstract class AbstractForkJoinRouter
extends AbstractMuleObjectOwner<MessageProcessorChain>
implements Scope {
    @Inject
    private SchedulerService schedulerService;
    @Inject
    private ConfigurationComponentLocator componentLocator;
    @Inject
    private ExtendedExpressionManager expressionManager;
    @Inject
    private ErrorTypeRepository errorTypeRepository;
    @Inject
    private FeatureFlaggingService featureFlaggingService;
    private ForkJoinStrategyFactory forkJoinStrategyFactory;
    private ForkJoinStrategy forkJoinStrategy;
    private long timeout = Long.MAX_VALUE;
    private Integer maxConcurrency;
    private Scheduler timeoutScheduler;
    private Scheduler timeoutBlockingScheduler;
    private ErrorType timeoutErrorType;
    private String target;
    private String targetValue = "#[payload]";
    private CompiledExpression targetValueExpression;

    @Override
    public CoreEvent process(CoreEvent event) throws MuleException {
        return MessageProcessors.processToApply(event, this);
    }

    @Override
    public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
        return Flux.from(publisher).doOnNext(this.onEvent()).flatMap(event -> Flux.from(this.forkJoinStrategy.forkJoin((CoreEvent)event, this.getRoutingPairs((CoreEvent)event))).map(result -> Operators.outputToTarget(event, result, this.target, this.targetValueExpression, this.expressionManager)).onErrorMap(throwable -> !(throwable instanceof MessagingException), throwable -> new MessagingException((CoreEvent)event, (Throwable)throwable, (Component)this)).onErrorStop());
    }

    protected Consumer<CoreEvent> onEvent() {
        return event -> {};
    }

    protected abstract Publisher<ForkJoinStrategy.RoutingPair> getRoutingPairs(CoreEvent var1);

    @Override
    public void initialise() throws InitialisationException {
        super.initialise();
        if (this.targetValue != null) {
            this.targetValueExpression = ExpressionLanguageUtils.compile(this.targetValue, this.expressionManager);
        }
        this.timeoutScheduler = this.schedulerService.cpuLightScheduler(SchedulerConfig.config().withName(this.getClass().getName() + ".timeoutScheduler - " + this.getLocation().getLocation()));
        this.timeoutBlockingScheduler = this.schedulerService.ioScheduler(SchedulerConfig.config().withName(this.getClass().getName() + ".timeoutBlockingScheduler - " + this.getLocation().getLocation()));
        this.timeoutErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.TIMEOUT).get();
        this.maxConcurrency = this.maxConcurrency != null ? this.maxConcurrency.intValue() : this.getDefaultMaxConcurrency();
        this.forkJoinStrategyFactory = this.forkJoinStrategyFactory != null ? this.forkJoinStrategyFactory : this.getDefaultForkJoinStrategyFactory();
        boolean isDetailedCompositeRoutingExceptionLogEnabled = this.featureFlaggingService.isEnabled(MuleRuntimeFeature.MULE_PRINT_DETAILED_COMPOSITE_EXCEPTION_LOG);
        this.forkJoinStrategy = this.forkJoinStrategyFactory.createForkJoinStrategy(this.resolveProcessingStrategy(), this.maxConcurrency, this.isDelayErrors(), this.timeout, this.timeoutScheduler, this.timeoutErrorType, this.timeoutBlockingScheduler, isDetailedCompositeRoutingExceptionLogEnabled);
    }

    protected ProcessingStrategy resolveProcessingStrategy() {
        return ComponentUtils.getFromAnnotatedObject(this.componentLocator, this).map(flow -> flow.getProcessingStrategy()).orElse(DirectProcessingStrategyFactory.DIRECT_PROCESSING_STRATEGY_INSTANCE);
    }

    @Override
    public void dispose() {
        if (this.timeoutScheduler != null) {
            this.timeoutScheduler.stop();
        }
        if (this.timeoutBlockingScheduler != null) {
            this.timeoutBlockingScheduler.stop();
        }
        super.dispose();
    }

    public void setForkJoinStrategyFactory(ForkJoinStrategyFactory forkJoinStrategyFactory) {
        this.forkJoinStrategyFactory = forkJoinStrategyFactory;
    }

    public void setTimeout(long timeout) {
        Preconditions.checkArgument(timeout > 0L, "Timeout must be greater than zero");
        this.timeout = timeout;
    }

    public void setMaxConcurrency(int maxConcurrency) {
        Preconditions.checkArgument(this.timeout > 0L, "Maximum concurrency must be one or more.");
        this.maxConcurrency = maxConcurrency;
    }

    public void setTarget(String target) {
        this.target = target;
    }

    public void setTargetValue(String targetValue) {
        this.targetValue = targetValue;
    }

    protected abstract int getDefaultMaxConcurrency();

    protected abstract boolean isDelayErrors();

    protected abstract ForkJoinStrategyFactory getDefaultForkJoinStrategyFactory();
}

