/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.collections.CollectionUtils;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.processor.AbstractMessageProcessorOwner;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Router;
import org.mule.runtime.core.api.routing.AggregationContext;
import org.mule.runtime.core.api.routing.AggregationStrategy;
import org.mule.runtime.core.api.routing.RoutePathNotFoundException;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.internal.routing.CollectAllAggregationStrategy;
import org.mule.runtime.core.internal.routing.FirstSuccessfulRoutingStrategy;
import org.mule.runtime.core.internal.util.ProcessingStrategyUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ScatterGatherRouter
extends AbstractMessageProcessorOwner
implements Router {
    @Inject
    private SchedulerService schedulerService;
    private boolean parallel = true;
    private long timeout = 0L;
    private List<Processor> routes = new ArrayList<Processor>();
    private boolean initialised = false;
    private List<Processor> routeChains = Collections.emptyList();
    private AggregationStrategy aggregationStrategy;
    private org.mule.runtime.api.scheduler.Scheduler scheduler;
    private Scheduler reactorScheduler;

    @Override
    public Event process(Event event) throws MuleException {
        return MessageProcessors.processToApply(event, this);
    }

    private void assertMorethanOneRoute() throws RoutePathNotFoundException {
        if (CollectionUtils.isEmpty(this.routes)) {
            throw new RoutePathNotFoundException(CoreMessages.noEndpointsForRouter(), null);
        }
    }

    @Override
    public Publisher<Event> apply(Publisher<Event> publisher) {
        return Flux.from(publisher).doOnNext(Exceptions.checkedConsumer(event -> {
            this.assertMorethanOneRoute();
            FirstSuccessfulRoutingStrategy.validateMessageIsNotConsumable(event.getMessage());
        })).concatMap(event -> Flux.from((Publisher)Flux.fromIterable(this.routeChains).concatMap(processor -> Flux.just((Object)event).transform((Function)this.scheduleRoute((Processor)processor)))).collectList().map(Exceptions.checkedFunction(list -> this.aggregationStrategy.aggregate(new AggregationContext((Event)event, (List<Event>)list)))));
    }

    private ReactiveProcessor scheduleRoute(Processor route) {
        if (!ProcessingStrategyUtils.isSynchronousProcessing(this.flowConstruct) && this.flowConstruct instanceof Pipeline) {
            return publisher -> Flux.from((Publisher)publisher).transform((Function)((Pipeline)this.flowConstruct).getProcessingStrategy().onPipeline(route));
        }
        return publisher -> Flux.from((Publisher)publisher).transform((Function)route).subscribeOn(this.reactorScheduler);
    }

    @Override
    public void initialise() throws InitialisationException {
        try {
            this.buildRouteChains();
            if (this.aggregationStrategy == null) {
                this.aggregationStrategy = new CollectAllAggregationStrategy();
            }
            if (this.timeout <= 0L) {
                this.timeout = Long.MAX_VALUE;
            }
        }
        catch (Exception e) {
            throw new InitialisationException((Throwable)e, (Initialisable)this);
        }
        super.initialise();
        this.initialised = true;
    }

    @Override
    public void start() throws MuleException {
        this.scheduler = this.schedulerService.ioScheduler(this.getLocation() != null ? this.muleContext.getSchedulerBaseConfig().withName(this.getLocation().getLocation()) : this.muleContext.getSchedulerBaseConfig());
        this.reactorScheduler = Schedulers.fromExecutorService((ExecutorService)this.scheduler);
        super.start();
    }

    @Override
    public void stop() throws MuleException {
        super.stop();
        if (this.scheduler != null) {
            this.scheduler.stop();
            this.scheduler = null;
        }
        if (this.reactorScheduler != null) {
            this.reactorScheduler.dispose();
            this.reactorScheduler = null;
        }
    }

    public void addRoute(Processor processor) throws MuleException {
        this.checkNotInitialised();
        this.routes.add(processor);
    }

    private void buildRouteChains() {
        Preconditions.checkState(this.routes.size() > 1, "At least 2 routes are required for ScatterGather");
        this.routeChains = this.routes.stream().map(route -> MessageProcessors.newChain(MessageProcessors.newExplicitChain(route))).collect(Collectors.toList());
    }

    private void checkNotInitialised() {
        Preconditions.checkState(!this.initialised, "<scatter-gather> router is not dynamic. Cannot modify routes after initialisation");
    }

    @Override
    protected List<Processor> getOwnedMessageProcessors() {
        return this.routeChains;
    }

    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
        this.aggregationStrategy = aggregationStrategy;
    }

    public void setParallel(boolean parallel) {
        this.parallel = parallel;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public void setRoutes(List<Processor> routes) {
        this.routes = routes;
    }
}

