/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing.forkjoin;

import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.message.ErrorBuilder;
import org.mule.runtime.core.api.message.GroupCorrelation;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.routing.ForkJoinStrategy;
import org.mule.runtime.core.api.routing.ForkJoinStrategyFactory;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.internal.routing.CompositeRoutingException;
import org.mule.runtime.core.internal.routing.RoutingResult;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public abstract class AbstractForkJoinStrategyFactory
implements ForkJoinStrategyFactory {
    public static final String TIMEOUT_EXCEPTION_DESCRIPTION = "Route Timeout";
    public static final String TIMEOUT_EXCEPTION_DETAILED_DESCRIPTION_PREFIX = "Timeout while processing route/part:";

    @Override
    public ForkJoinStrategy createForkJoinStrategy(ProcessingStrategy processingStrategy, int maxConcurrency, boolean delayErrors, long timeout, org.mule.runtime.api.scheduler.Scheduler timeoutScheduler, ErrorType timeoutErrorType) {
        Scheduler reatorTimeoutScheduler = Schedulers.fromExecutorService((ExecutorService)timeoutScheduler);
        return (original, routingPairs) -> {
            AtomicInteger count = new AtomicInteger();
            InternalEvent.Builder resultBuilder = InternalEvent.builder(original);
            return Flux.from((Publisher)routingPairs).map(this.addSequence(count)).flatMap(this.processRoutePair(processingStrategy, maxConcurrency, delayErrors, timeout, reatorTimeoutScheduler, timeoutErrorType), maxConcurrency).collectList().doOnNext(Exceptions.checkedConsumer(list -> {
                if (list.stream().anyMatch(event -> event.getError().isPresent())) {
                    throw this.createCompositeRoutingException((List<InternalEvent>)list);
                }
            })).doOnNext(this.copyVars(resultBuilder)).map(this.createResultEvent(original, resultBuilder));
        };
    }

    protected abstract Function<List<InternalEvent>, InternalEvent> createResultEvent(InternalEvent var1, InternalEvent.Builder var2);

    private Function<ForkJoinStrategy.RoutingPair, ForkJoinStrategy.RoutingPair> addSequence(AtomicInteger count) {
        return pair -> ForkJoinStrategy.RoutingPair.of(InternalEvent.builder(pair.getEvent()).groupCorrelation(Optional.of(GroupCorrelation.of(count.getAndIncrement()))).build(), pair.getRoute());
    }

    private Function<ForkJoinStrategy.RoutingPair, Publisher<? extends InternalEvent>> processRoutePair(ProcessingStrategy processingStrategy, int maxConcurrency, boolean delayErrors, long timeout, Scheduler timeoutScheduler, ErrorType timeoutErrorType) {
        return pair -> {
            ReactiveProcessor route = publisher -> Flux.from((Publisher)publisher).transform((Function)pair.getRoute()).timeout(Duration.ofMillis(timeout), (Publisher)Mono.defer(() -> delayErrors ? Mono.just((Object)this.createTimeoutErrorEvent(timeoutErrorType, (ForkJoinStrategy.RoutingPair)pair)) : Mono.error((Throwable)new TimeoutException("Timeout while processing route/part: '" + pair.getEvent().getGroupCorrelation().get().getSequence() + "'"))), timeoutScheduler);
            return Flux.from(MessageProcessors.processWithChildContext(pair.getEvent(), this.applyProcessingStrategy(processingStrategy, route, maxConcurrency), Optional.empty())).onErrorResume(MessagingException.class, me -> delayErrors ? Mono.just((Object)me.getEvent()) : Mono.error((Throwable)me));
        };
    }

    private ReactiveProcessor applyProcessingStrategy(ProcessingStrategy processingStrategy, ReactiveProcessor processor, int maxConcurrency) {
        if (maxConcurrency > 1) {
            return processingStrategy.onPipeline(processor);
        }
        return processor;
    }

    private InternalEvent createTimeoutErrorEvent(ErrorType timeoutErrorType, ForkJoinStrategy.RoutingPair pair) {
        return InternalEvent.builder(pair.getEvent()).message(Message.of(null)).error(ErrorBuilder.builder().errorType(timeoutErrorType).exception(new TimeoutException()).description(TIMEOUT_EXCEPTION_DESCRIPTION).detailedDescription("Timeout while processing route/part: '" + pair.getEvent().getGroupCorrelation().get().getSequence() + "'").build()).build();
    }

    private CompositeRoutingException createCompositeRoutingException(List<InternalEvent> results) {
        LinkedHashMap<String, Message> successMap = new LinkedHashMap<String, Message>();
        LinkedHashMap<String, Error> errorMap = new LinkedHashMap<String, Error>();
        for (InternalEvent event : results) {
            String key = Integer.toString(event.getGroupCorrelation().get().getSequence());
            if (event.getError().isPresent()) {
                errorMap.put(key, event.getError().get());
                continue;
            }
            successMap.put(key, event.getMessage());
        }
        return new CompositeRoutingException(new RoutingResult(successMap, errorMap));
    }

    private Consumer<List<InternalEvent>> copyVars(InternalEvent.Builder result) {
        return list -> list.stream().forEach(event -> event.getVariables().entrySet().stream().forEach(entry -> result.addVariable((String)entry.getKey(), entry.getValue())));
    }
}

