/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.el.BindingContextUtils;
import org.mule.runtime.api.exception.ExceptionHelper;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.error.matcher.ErrorTypeMatcher;
import org.mule.runtime.api.message.error.matcher.ErrorTypeMatcherUtils;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.collection.SmallMap;
import org.mule.runtime.core.api.el.ExpressionManagerSession;
import org.mule.runtime.core.api.el.ExtendedExpressionManager;
import org.mule.runtime.core.api.error.Errors;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.event.EventInternalContextResolver;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

class UntilSuccessfulRouter {
    private static final Logger LOGGER = LoggerFactory.getLogger(UntilSuccessfulRouter.class);
    static final String RETRY_CTX_INTERNAL_PARAM_KEY = "untilSuccessful.router.retryContext";
    private static final String UNTIL_SUCCESSFUL_MSG = "'until-successful' retries exhausted";
    private final EventInternalContextResolver<Map<String, RetryContext>> retryContextResolver;
    private final Component owner;
    private final boolean suppressErrors;
    private final boolean retryOnCriticalErrors;
    private final Predicate<CoreEvent> shouldRetry;
    private final ConditionalExecutorServiceDecorator delayScheduler;
    private final ErrorTypeMatcher criticalMatcher = ErrorTypeMatcherUtils.createErrorTypeMatcher(Errors.CRITICAL_ERROR_TYPE);
    private final Flux<CoreEvent> upstreamFlux;
    private final Flux<CoreEvent> innerFlux;
    private final Flux<CoreEvent> downstreamFlux;
    private final FluxSinkRecorder<CoreEvent> innerRecorder = new FluxSinkRecorder();
    private final FluxSinkRecorder<Either<Throwable, CoreEvent>> downstreamRecorder = new FluxSinkRecorder();
    private final AtomicReference<ContextView> downstreamCtxReference = new AtomicReference<Context>(Context.empty());
    private Function<ExpressionManagerSession, Integer> maxRetriesSupplier;
    private Function<ExpressionManagerSession, Integer> delaySupplier;
    private Function<CoreEvent, ExpressionManagerSession> sessionSupplier;
    private final AtomicInteger inflightEvents = new AtomicInteger(0);
    private final AtomicBoolean completeDeferred = new AtomicBoolean(false);
    private final Map<CoreEvent, ScheduledFuture<?>> pendingRetries = new ConcurrentHashMap();

    UntilSuccessfulRouter(Component owner, Publisher<CoreEvent> publisher, Processor nestedChain, ProcessingStrategy processingStrategy, ExtendedExpressionManager expressionManager, Predicate<CoreEvent> shouldRetry, Scheduler delayScheduler, String maxRetries, String millisBetweenRetries, boolean suppressErrors, boolean retryOnCriticalErrors, boolean cancelOnForcefulShutdown) {
        this.owner = owner;
        this.suppressErrors = suppressErrors;
        this.retryOnCriticalErrors = retryOnCriticalErrors;
        this.shouldRetry = shouldRetry;
        this.delayScheduler = new ConditionalExecutorServiceDecorator(delayScheduler, s -> TransactionCoordination.isTransactionActive());
        this.retryContextResolver = new EventInternalContextResolver<Map>(RETRY_CTX_INTERNAL_PARAM_KEY, HashMap::new);
        this.upstreamFlux = Flux.deferContextual(subCtx -> Flux.from((Publisher)publisher).doOnNext(event -> {
            RetryContext ctx = new RetryContext((CoreEvent)event, this.sessionSupplier, this.maxRetriesSupplier, this.delaySupplier);
            this.inflightEvents.getAndIncrement();
            this.innerRecorder.next(this.eventWithCurrentContext((CoreEvent)event, ctx));
        }).doOnComplete(() -> {
            int i = this.inflightEvents.get();
            if (cancelOnForcefulShutdown && publisher instanceof Flux && !subCtx.getOrEmpty((Object)"messageProcessors.withinProcessToApply").isPresent()) {
                this.delayScheduler.shutdown();
                this.pendingRetries.entrySet().forEach(e -> {
                    CoreEvent event = (CoreEvent)e.getKey();
                    LOGGER.trace("Cancelling event '{}'...", (Object)event.getContext().getId());
                    ((ScheduledFuture)e.getValue()).cancel(true);
                    InterruptedException exception = new InterruptedException("UntilSuccessfulRouter.upstreamFlux @ " + this.owner.getLocation().getLocation() + " complete");
                    this.downstreamRecorder.next(Either.left(this.toMessagingException(event, exception), CoreEvent.class));
                });
                i -= this.pendingRetries.size();
                this.pendingRetries.clear();
            }
            if (i == 0) {
                LOGGER.debug("No inflight events, propagating completion immediately.");
                this.completeRouter();
            } else {
                LOGGER.debug("{} inflight events, propagating completion eventually.", (Object)i);
                this.completeDeferred.set(true);
            }
        }));
        this.innerFlux = Flux.from(processingStrategy.configureInternalPublisher((Publisher<CoreEvent>)this.innerRecorder.flux())).transform(innerPublisher -> MessageProcessors.applyWithChildContext((Publisher<CoreEvent>)innerPublisher, nestedChain, Optional.of(owner.getLocation()))).doOnNext(successfulEvent -> {
            this.downstreamRecorder.next(Either.right(Throwable.class, this.eventWithCurrentContextDeleted((CoreEvent)successfulEvent)));
            this.completeRouterIfNecessary();
        }).onErrorContinue(this.getRetryPredicate(), this.getRetryHandler());
        this.downstreamFlux = Flux.create(sink -> {
            this.downstreamRecorder.accept((FluxSink<Either<Throwable, CoreEvent>>)sink);
            this.subscribeUpstreamChains(this.downstreamCtxReference.get());
        }).doOnNext(event -> this.inflightEvents.decrementAndGet()).map(this.getScopeResultMapper());
        this.maxRetriesSupplier = expressionManager.isExpression(maxRetries) ? this.expressionToIntegerSupplierFor(maxRetries) : session -> Integer.parseInt(maxRetries);
        this.delaySupplier = expressionManager.isExpression(millisBetweenRetries) ? this.expressionToIntegerSupplierFor(millisBetweenRetries) : session -> Integer.parseInt(millisBetweenRetries);
        this.sessionSupplier = !expressionManager.isExpression(maxRetries) && !expressionManager.isExpression(millisBetweenRetries) ? event -> null : event -> expressionManager.openSession(owner.getLocation(), (CoreEvent)event, BindingContextUtils.NULL_BINDING_CONTEXT);
    }

    private Function<Either<Throwable, CoreEvent>, CoreEvent> getScopeResultMapper() {
        return either -> {
            if (either.isLeft()) {
                LOGGER.debug("getScopeResultMapper - propagating error: {}", (Throwable)either.getLeft());
                throw Exceptions.propagate((Throwable)((Throwable)either.getLeft()));
            }
            return (CoreEvent)either.getRight();
        };
    }

    private BiConsumer<Throwable, Object> getRetryHandler() {
        return (error, offendingEvent) -> {
            MessagingException messagingError = (MessagingException)error;
            RetryContext ctx = this.getRetryContextForEvent(messagingError.getEvent());
            int retriesLeft = 0;
            if (ctx != null) {
                retriesLeft = ctx.retryCount.getAndDecrement();
            } else {
                LOGGER.error("The RetryContext was not found. This is probably a race condition. No further attempts for the until successful will be done.");
            }
            boolean isCriticalError = this.retryOnCriticalErrors ? false : messagingError.getEvent().getError().map(Error::getErrorType).map(this.criticalMatcher::match).orElse(false);
            if (retriesLeft > 0 && !isCriticalError && !this.delayScheduler.isShutdown()) {
                try {
                    this.scheduleRetryHandler(ctx);
                }
                catch (Exception e) {
                    this.downstreamRecorder.next(Either.left(this.toMessagingException(ctx.event, e), CoreEvent.class));
                }
            } else {
                this.retriesExhaustedOrCriticalHandler((Throwable)error, messagingError, ctx, isCriticalError);
            }
        };
    }

    private void scheduleRetryHandler(RetryContext ctx) {
        LOGGER.error("Scheduling retry execution of event, attempt {} of {}.", (Object)ctx.getAttemptNumber(), this.maxRetriesLabel(ctx));
        if (TransactionCoordination.isTransactionActive() || ctx.delayInMillis == 0) {
            this.delayScheduler.schedule(() -> {
                LOGGER.debug("Retrying execution of event, attempt {} of {}.", (Object)ctx.getAttemptNumber(), this.maxRetriesLabel(ctx));
                this.innerRecorder.next(this.eventWithCurrentContext(ctx.event, ctx));
            }, (long)ctx.delayInMillis.intValue(), TimeUnit.MILLISECONDS);
        } else {
            ScheduledFuture<?> scheduledRetry = this.delayScheduler.schedule(() -> {
                ScheduledFuture<?> removed = this.pendingRetries.remove(ctx.event);
                if (removed != null) {
                    LOGGER.debug("Retrying execution of event, attempt {} of {}.", (Object)ctx.getAttemptNumber(), this.maxRetriesLabel(ctx));
                    this.innerRecorder.next(this.eventWithCurrentContext(ctx.event, ctx));
                }
            }, (long)ctx.delayInMillis.intValue(), TimeUnit.MILLISECONDS);
            ScheduledFuture<?> preexistingRetry = this.pendingRetries.put(ctx.event, scheduledRetry);
            if (preexistingRetry != null) {
                scheduledRetry.cancel(true);
                throw new IllegalStateException("Pending retry for " + ctx.event.getContext().getId() + " was already present.");
            }
        }
    }

    private Object maxRetriesLabel(RetryContext ctx) {
        return ctx.maxRetries != -1 ? ctx.maxRetries : "unlimited";
    }

    private MessagingException toMessagingException(CoreEvent event, Exception exception) {
        return new MessagingException(CoreEvent.builder(event).error(ErrorBuilder.builder().exception(exception).description(exception.getMessage()).detailedDescription(exception.toString()).errorType(Errors.CANCELLED_ERROR_TYPE).build()).build(), (Throwable)exception, this.owner);
    }

    private void retriesExhaustedOrCriticalHandler(Throwable error, MessagingException messagingError, RetryContext ctx, boolean isCriticalError) {
        Throwable resolvedError;
        if (isCriticalError) {
            LOGGER.debug("Critical error thrown. Failing...");
            resolvedError = error;
        } else {
            LOGGER.error("Retry attempts exhausted. Failing...");
            resolvedError = ctx != null ? this.getThrowableFunction(ctx.event).apply(error) : this.getThrowableFunction(messagingError.getEvent()).apply(error);
        }
        this.eventWithCurrentContextDeleted(messagingError.getEvent());
        this.downstreamRecorder.next(Either.left(resolvedError, CoreEvent.class));
        this.completeRouterIfNecessary();
    }

    private void completeRouterIfNecessary() {
        if (this.completeDeferred.get() && this.inflightEvents.get() == 0) {
            this.completeRouter();
        }
    }

    private void completeRouter() {
        this.innerRecorder.complete();
        this.downstreamRecorder.complete();
    }

    Publisher<CoreEvent> getDownstreamPublisher() {
        return this.downstreamFlux.transformDeferredContextual((downstreamPublisher, downstreamContext) -> downstreamPublisher.doOnSubscribe(s -> this.downstreamCtxReference.set((ContextView)downstreamContext)));
    }

    private void subscribeUpstreamChains(ContextView downstreamContext) {
        AtomicReference handledError = new AtomicReference();
        this.innerFlux.contextWrite(downstreamContext).subscribe(e -> {}, handledError::set);
        if (handledError.get() != null) {
            throw org.mule.runtime.core.api.rx.Exceptions.propagateWrappingFatal((Throwable)handledError.get());
        }
        this.upstreamFlux.contextWrite(downstreamContext).subscribe(e -> {}, handledError::set);
        if (handledError.get() != null) {
            throw org.mule.runtime.core.api.rx.Exceptions.propagateWrappingFatal((Throwable)handledError.get());
        }
    }

    private RetryContext getRetryContextForEvent(CoreEvent event) {
        return this.retryContextResolver.getCurrentContextFromEvent(event).get(event.getContext().getId());
    }

    private CoreEvent eventWithCurrentContext(CoreEvent event, RetryContext ctx) {
        Map<String, RetryContext> retryCtxContainer = SmallMap.copy(this.retryContextResolver.getCurrentContextFromEvent(event));
        retryCtxContainer.put(event.getContext().getId(), ctx);
        return this.retryContextResolver.eventWithContext(event, retryCtxContainer);
    }

    private CoreEvent eventWithCurrentContextDeleted(CoreEvent event) {
        Map<String, RetryContext> retryCtxContainer = this.retryContextResolver.getCurrentContextFromEvent(event);
        retryCtxContainer.remove(event.getContext().getId());
        return this.retryContextResolver.eventWithContext(event, retryCtxContainer);
    }

    private Predicate<Throwable> getRetryPredicate() {
        return e -> e instanceof MessagingException && this.shouldRetry.test(((MessagingException)e).getEvent());
    }

    private Function<Throwable, Throwable> getThrowableFunction(CoreEvent event) {
        return throwable -> {
            CoreEvent exceptionEvent = event;
            Throwable retryPolicyExhaustionCause = this.suppressMuleException((Throwable)throwable);
            RetryPolicyExhaustedException retryPolicyExhaustedException = new RetryPolicyExhaustedException(I18nMessageFactory.createStaticMessage(UNTIL_SUCCESSFUL_MSG), retryPolicyExhaustionCause, this.owner);
            if (throwable instanceof MessagingException) {
                exceptionEvent = ((MessagingException)throwable).getEvent();
            }
            return new MessagingException(exceptionEvent, (Throwable)retryPolicyExhaustedException, this.owner);
        };
    }

    private Throwable suppressMuleException(Throwable throwable) {
        if (this.suppressErrors) {
            return ExceptionHelper.suppressIfPresent(throwable, MuleException.class);
        }
        return throwable;
    }

    private Function<ExpressionManagerSession, Integer> expressionToIntegerSupplierFor(String anExpression) {
        return session -> {
            try {
                return (Integer)session.evaluate(anExpression, DataType.NUMBER).getValue();
            }
            catch (Exception evaluationException) {
                throw new RetryContextInitializationException(evaluationException);
            }
        };
    }

    private static class RetryContext {
        CoreEvent event;
        AtomicInteger retryCount = new AtomicInteger();
        Integer delayInMillis;
        Integer maxRetries;

        RetryContext(CoreEvent event, Function<CoreEvent, ExpressionManagerSession> sessionSupplier, Function<ExpressionManagerSession, Integer> maxRetriesSupplier, Function<ExpressionManagerSession, Integer> delayTimeSupplier) {
            this.event = event;
            ExpressionManagerSession session = sessionSupplier.apply(event);
            this.maxRetries = maxRetriesSupplier.apply(session);
            this.delayInMillis = delayTimeSupplier.apply(session);
            this.retryCount.set(this.maxRetries);
        }

        int getAttemptNumber() {
            return this.maxRetries - this.retryCount.get();
        }
    }

    static class RetryContextInitializationException
    extends RuntimeException {
        private static final long serialVersionUID = -399718600886069735L;

        public RetryContextInitializationException(Throwable cause) {
            super(cause);
        }
    }
}

