/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessage;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.processor.AbstractMuleObjectOwner;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Router;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.retry.policy.SimpleRetryPolicyTemplate;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class UntilSuccessful
extends AbstractMuleObjectOwner
implements Router {
    private static final Logger LOGGER = LoggerFactory.getLogger(UntilSuccessful.class);
    private static final String UNTIL_SUCCESSFUL_MSG_PREFIX = "'until-successful' retries exhausted. Last exception message was: %s";
    private static final String EXPRESSION_FAILED_MSG = "Failure expression positive when processing event: ";
    private static final long DEFAULT_MILLIS_BETWEEN_RETRIES = 60000L;
    private static final int DEFAULT_RETRIES = 5;
    private int maxRetries = 5;
    private Long millisBetweenRetries = 60000L;
    private String failureExpression = "error != null";
    private MessageProcessorChain nestedChain;
    private Predicate<Event> shouldRetry;
    private SimpleRetryPolicyTemplate policyTemplate;
    private Scheduler timer;

    @Override
    public void initialise() throws InitialisationException {
        if (this.nestedChain == null) {
            throw new InitialisationException(I18nMessageFactory.createStaticMessage("One message processor must be configured within 'until-successful'."), (Initialisable)this);
        }
        super.initialise();
        this.timer = this.muleContext.getSchedulerService().cpuLightScheduler();
        this.policyTemplate = new SimpleRetryPolicyTemplate(this.millisBetweenRetries, this.maxRetries, this.timer);
        this.shouldRetry = event -> this.muleContext.getExpressionManager().evaluateBoolean(this.failureExpression, (Event)event, this.getLocation(), false, true);
    }

    @Override
    public void dispose() {
        super.dispose();
        this.timer.stop();
    }

    @Override
    public Event process(Event event) throws MuleException {
        return MessageProcessors.processToApply(event, this);
    }

    @Override
    public Publisher<Event> apply(Publisher<Event> publisher) {
        return Flux.from(publisher).flatMap(event -> Mono.from(MessageProcessors.processWithChildContext(event, this.scheduleRoute(p -> Mono.from((Publisher)p).transform((Function)this.nestedChain).doOnNext(result -> {
            if (this.shouldRetry.test((Event)result)) {
                throw new FailureExpressionAssertionException(I18nMessageFactory.createStaticMessage(EXPRESSION_FAILED_MSG + event));
            }
        })), Optional.ofNullable(this.getLocation()))).transform(p -> this.policyTemplate.applyPolicy(p, this.getRetryPredicate(), e -> {}, this.getThrowableFunction((Event)event))));
    }

    private Predicate<Throwable> getRetryPredicate() {
        return e -> e instanceof FailureExpressionAssertionException || e instanceof MessagingException && this.shouldRetry.test(((MessagingException)e).getEvent());
    }

    private Function<Throwable, Throwable> getThrowableFunction(Event event) {
        return throwable -> {
            Throwable cause = ExceptionUtils.getMessagingExceptionCause(throwable);
            return new MessagingException(event, new RetryPolicyExhaustedException(I18nMessageFactory.createStaticMessage(UNTIL_SUCCESSFUL_MSG_PREFIX, cause.getMessage()), cause, this), (Processor)this);
        };
    }

    private ReactiveProcessor scheduleRoute(ReactiveProcessor route) {
        if (this.flowConstruct instanceof Pipeline) {
            return publisher -> Flux.from((Publisher)publisher).transform((Function)((Pipeline)this.flowConstruct).getProcessingStrategy().onPipeline(route));
        }
        return publisher -> publisher;
    }

    public int getMaxRetries() {
        return this.maxRetries;
    }

    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }

    public long getMillisBetweenRetries() {
        return this.millisBetweenRetries;
    }

    public void setMillisBetweenRetries(long millisBetweenRetries) {
        this.millisBetweenRetries = millisBetweenRetries;
    }

    public String getFailureExpression() {
        return this.failureExpression;
    }

    public void setFailureExpression(String failureExpression) {
        this.failureExpression = failureExpression;
    }

    public void setMessageProcessors(List<Processor> processors) {
        this.nestedChain = MessageProcessors.newChain(processors);
    }

    protected List<Object> getOwnedObjects() {
        return Collections.singletonList(this.nestedChain);
    }

    private static class FailureExpressionAssertionException
    extends MuleRuntimeException {
        public FailureExpressionAssertionException(I18nMessage message) {
            super(message);
        }
    }
}

