/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.execution;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.ErrorTypeRepository;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.interception.SourceInterceptorFactory;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.notification.Notification;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextFactory;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.core.api.exception.FlowExceptionHandler;
import org.mule.runtime.core.api.exception.NullExceptionHandler;
import org.mule.runtime.core.api.functional.Either;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.execution.MessageProcessPhase;
import org.mule.runtime.core.internal.execution.ModuleFlowProcessingPhaseTemplate;
import org.mule.runtime.core.internal.execution.NotificationFiringProcessingPhase;
import org.mule.runtime.core.internal.execution.PhaseResultNotifier;
import org.mule.runtime.core.internal.execution.SourceErrorException;
import org.mule.runtime.core.internal.execution.SourceResultAdapter;
import org.mule.runtime.core.internal.execution.ValidationPhase;
import org.mule.runtime.core.internal.interception.InterceptorManager;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.policy.PolicyManager;
import org.mule.runtime.core.internal.policy.SourcePolicy;
import org.mule.runtime.core.internal.policy.SourcePolicyFailureResult;
import org.mule.runtime.core.internal.policy.SourcePolicySuccessResult;
import org.mule.runtime.core.internal.processor.interceptor.ReactiveInterceptorSourceCallbackAdapter;
import org.mule.runtime.core.internal.util.FunctionalUtils;
import org.mule.runtime.core.internal.util.InternalExceptionUtils;
import org.mule.runtime.core.internal.util.MessagingExceptionResolver;
import org.mule.runtime.core.internal.util.message.MessageUtils;
import org.mule.runtime.core.privileged.PrivilegedMuleContext;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.execution.MessageProcessContext;
import org.mule.runtime.core.privileged.execution.MessageProcessTemplate;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

public class ModuleFlowProcessingPhase
extends NotificationFiringProcessingPhase<ModuleFlowProcessingPhaseTemplate>
implements Initialisable {
    private ErrorType sourceResponseGenerateErrorType;
    private ErrorType sourceResponseSendErrorType;
    private ErrorType sourceErrorResponseGenerateErrorType;
    private ErrorType sourceErrorResponseSendErrorType;
    private ConfigurationComponentLocator componentLocator;
    private final PolicyManager policyManager;
    private List<ReactiveInterceptorSourceCallbackAdapter> additionalInterceptors = new LinkedList<ReactiveInterceptorSourceCallbackAdapter>();
    @Inject
    private InterceptorManager processorInterceptorManager;

    public ModuleFlowProcessingPhase(PolicyManager policyManager) {
        this.policyManager = policyManager;
    }

    @Override
    public void initialise() throws InitialisationException {
        ErrorTypeRepository errorTypeRepository = this.muleContext.getErrorTypeRepository();
        this.componentLocator = this.muleContext.getConfigurationComponentLocator();
        this.sourceResponseGenerateErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_GENERATE).get();
        this.sourceResponseSendErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_SEND).get();
        this.sourceErrorResponseGenerateErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_GENERATE).get();
        this.sourceErrorResponseSendErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_SEND).get();
        if (this.processorInterceptorManager != null) {
            this.processorInterceptorManager.getSourceInterceptorFactories().stream().forEach(interceptorFactory -> {
                ReactiveInterceptorSourceCallbackAdapter reactiveInterceptorAdapter = new ReactiveInterceptorSourceCallbackAdapter((SourceInterceptorFactory)interceptorFactory);
                try {
                    this.muleContext.getInjector().inject(reactiveInterceptorAdapter);
                }
                catch (MuleException e) {
                    throw new MuleRuntimeException(e);
                }
                this.additionalInterceptors.add(0, reactiveInterceptorAdapter);
            });
        }
    }

    @Override
    public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate) {
        return messageProcessTemplate instanceof ModuleFlowProcessingPhaseTemplate;
    }

    @Override
    public void runPhase(ModuleFlowProcessingPhaseTemplate template, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier) {
        try {
            MessageSource messageSource = messageProcessContext.getMessageSource();
            FlowConstruct flowConstruct = (FlowConstruct)this.componentLocator.find(messageSource.getRootContainerLocation()).get();
            ComponentLocation sourceLocation = messageSource.getLocation();
            Consumer<Either<MessagingException, CoreEvent>> terminateConsumer = this.getTerminateConsumer(messageSource, template);
            CompletableFuture responseCompletion = new CompletableFuture();
            CoreEvent templateEvent = this.createEvent(template, sourceLocation, responseCompletion, flowConstruct);
            try {
                FlowProcessor flowExecutionProcessor = new FlowProcessor(template, flowConstruct.getExceptionListener(), templateEvent);
                flowExecutionProcessor.setAnnotations(flowConstruct.getAnnotations());
                SourcePolicy policy = this.policyManager.createSourcePolicyInstance(messageSource, templateEvent, flowExecutionProcessor, template);
                PhaseContext phaseContext = new PhaseContext(template, messageProcessContext, phaseResultNotifier, terminateConsumer);
                Mono.just(templateEvent).doOnNext(this.onMessageReceived(template, messageProcessContext, flowConstruct)).flatMap(request -> Mono.from(policy.process((CoreEvent)request))).flatMap(policyResult -> policyResult.reduce(this.policyFailure(phaseContext, flowConstruct, messageSource), this.policySuccess(phaseContext, flowConstruct, messageSource))).doOnSuccess(aVoid -> phaseResultNotifier.phaseSuccessfully()).doOnError(this.onFailure(flowConstruct, messageSource, phaseResultNotifier, terminateConsumer)).doAfterTerminate(() -> responseCompletion.complete(null)).subscribe();
            }
            catch (Exception e) {
                Mono.from(template.sendFailureResponseToClient(new MessagingExceptionResolver(messageProcessContext.getMessageSource()).resolve(new MessagingException(templateEvent, (Throwable)e), this.muleContext), template.getFailedExecutionResponseParametersFunction().apply(templateEvent))).doOnTerminate(() -> phaseResultNotifier.phaseFailure(e)).subscribe();
            }
        }
        catch (Exception t) {
            phaseResultNotifier.phaseFailure(t);
        }
    }

    private Consumer<CoreEvent> onMessageReceived(ModuleFlowProcessingPhaseTemplate template, MessageProcessContext messageProcessContext, FlowConstruct flowConstruct) {
        return request -> {
            this.fireNotification(messageProcessContext.getMessageSource(), (CoreEvent)request, flowConstruct, 801);
            template.getNotificationFunctions().forEach(notificationFunction -> this.muleContext.getNotificationManager().fireNotification((Notification)notificationFunction.apply(request, messageProcessContext.getMessageSource())));
        };
    }

    private Function<SourcePolicySuccessResult, Mono<Void>> policySuccess(PhaseContext ctx, FlowConstruct flowConstruct, MessageSource messageSource) {
        return successResult -> {
            this.fireNotification(ctx.messageProcessContext.getMessageSource(), successResult.getResult(), flowConstruct, 805);
            try {
                Function<SourcePolicySuccessResult, Publisher<Void>> sendResponseToClient = result -> ctx.template.sendResponseToClient(result.getResult(), result.getResponseParameters().get());
                for (ReactiveInterceptorSourceCallbackAdapter interceptor : this.additionalInterceptors) {
                    sendResponseToClient = interceptor.apply(messageSource, sendResponseToClient);
                }
                return Mono.from(sendResponseToClient.apply((SourcePolicySuccessResult)successResult)).doOnSuccess(v -> this.onTerminate(flowConstruct, messageSource, ctx.terminateConsumer, Either.right(successResult.getResult()))).onErrorResume(e -> this.policySuccessError(new SourceErrorException(successResult.getResult(), this.sourceResponseSendErrorType, (Throwable)e), (SourcePolicySuccessResult)successResult, ctx, flowConstruct, messageSource));
            }
            catch (Exception e2) {
                return this.policySuccessError(new SourceErrorException(successResult.getResult(), this.sourceResponseGenerateErrorType, e2), (SourcePolicySuccessResult)successResult, ctx, flowConstruct, messageSource);
            }
        };
    }

    private Function<SourcePolicyFailureResult, Mono<Void>> policyFailure(PhaseContext ctx, FlowConstruct flowConstruct, MessageSource messageSource) {
        return failureResult -> {
            this.fireNotification(ctx.messageProcessContext.getMessageSource(), failureResult.getMessagingException().getEvent(), flowConstruct, 806);
            return this.sendErrorResponse(failureResult.getMessagingException(), event -> failureResult.getErrorResponseParameters().get(), ctx, flowConstruct).doOnSuccess(v -> this.onTerminate(flowConstruct, messageSource, ctx.terminateConsumer, Either.left(failureResult.getMessagingException())));
        };
    }

    private Mono<Void> policySuccessError(SourceErrorException see, SourcePolicySuccessResult successResult, PhaseContext ctx, FlowConstruct flowConstruct, MessageSource messageSource) {
        MessagingException messagingException = see.toMessagingException(flowConstruct.getMuleContext().getExceptionContextProviders(), messageSource);
        return Mono.when(Mono.just(messagingException).flatMapMany(flowConstruct.getExceptionListener()).last().onErrorResume(e -> Mono.empty()), this.sendErrorResponse(messagingException, successResult.createErrorResponseParameters(), ctx, flowConstruct).doOnSuccess(v -> this.onTerminate(flowConstruct, messageSource, ctx.terminateConsumer, Either.left(messagingException)))).then();
    }

    private Mono<Void> sendErrorResponse(MessagingException messagingException, Function<CoreEvent, Map<String, Object>> errorParameters, PhaseContext ctx, FlowConstruct flowConstruct) {
        CoreEvent event = messagingException.getEvent();
        if (messagingException.inErrorHandler()) {
            return Mono.error(new SourceErrorException(event, this.sourceErrorResponseGenerateErrorType, messagingException.getCause(), messagingException));
        }
        try {
            return Mono.from(ctx.template.sendFailureResponseToClient(messagingException, errorParameters.apply(event))).onErrorMap(e -> new SourceErrorException(CoreEvent.builder(messagingException.getEvent()).error(ErrorBuilder.builder(e).errorType(this.sourceErrorResponseSendErrorType).build()).build(), this.sourceErrorResponseSendErrorType, (Throwable)e));
        }
        catch (Exception e2) {
            return Mono.error(new SourceErrorException(event, this.sourceErrorResponseGenerateErrorType, e2, messagingException));
        }
    }

    private Consumer<Throwable> onFailure(FlowConstruct flowConstruct, MessageSource messageSource, PhaseResultNotifier phaseResultNotifier, Consumer<Either<MessagingException, CoreEvent>> terminateConsumer) {
        return throwable -> {
            this.onTerminate(flowConstruct, messageSource, terminateConsumer, Either.left(throwable));
            throwable = throwable instanceof SourceErrorException ? throwable.getCause() : throwable;
            Exception failureException = throwable instanceof Exception ? (Exception)throwable : new DefaultMuleException((Throwable)throwable);
            phaseResultNotifier.phaseFailure(failureException);
        };
    }

    private Consumer<Either<MessagingException, CoreEvent>> getTerminateConsumer(MessageSource messageSource, ModuleFlowProcessingPhaseTemplate template) {
        return eventOrException -> template.afterPhaseExecution(eventOrException.mapLeft(messagingException -> {
            messagingException.setProcessedEvent(InternalExceptionUtils.createErrorEvent(messagingException.getEvent(), messageSource, messagingException, ((PrivilegedMuleContext)this.muleContext).getErrorTypeLocator()));
            return messagingException;
        }));
    }

    private CoreEvent createEvent(ModuleFlowProcessingPhaseTemplate template, ComponentLocation sourceLocation, CompletableFuture responseCompletion, FlowConstruct flowConstruct) {
        Message message = template.getMessage();
        if (message.getPayload().getValue() instanceof SourceResultAdapter) {
            SourceResultAdapter adapter = (SourceResultAdapter)message.getPayload().getValue();
            InternalEvent.Builder eventBuilder = this.createEventBuilder(sourceLocation, responseCompletion, flowConstruct, adapter.getCorrelationId().orElse(null), message);
            return eventBuilder.message(eventCtx -> {
                Result result = adapter.getResult();
                Object resultValue = result.getOutput();
                if (resultValue instanceof Collection && adapter.isCollection()) {
                    return MessageUtils.toMessage(Result.builder().output(MessageUtils.toMessageCollection((Collection)resultValue, adapter.getCursorProviderFactory(), ((BaseEventContext)eventCtx).getRootContext())).mediaType(result.getMediaType().orElse(MediaType.ANY)).build());
                }
                return MessageUtils.toMessage(result, adapter.getMediaType(), adapter.getCursorProviderFactory(), ((BaseEventContext)eventCtx).getRootContext());
            }).build();
        }
        return this.createEventBuilder(sourceLocation, responseCompletion, flowConstruct, null, message).build();
    }

    private InternalEvent.Builder createEventBuilder(ComponentLocation sourceLocation, CompletableFuture responseCompletion, FlowConstruct flowConstruct, String correlationId, Message message) {
        return InternalEvent.builder(EventContextFactory.create(flowConstruct, NullExceptionHandler.getInstance(), sourceLocation, correlationId, Optional.of(responseCompletion))).message(message);
    }

    private CoreEvent emptyEvent(CoreEvent request) {
        return CoreEvent.builder(request).message(Message.of(null)).build();
    }

    private void onTerminate(FlowConstruct flowConstruct, MessageSource messageSource, Consumer<Either<MessagingException, CoreEvent>> terminateConsumer, Either<Throwable, CoreEvent> result) {
        FunctionalUtils.safely(() -> terminateConsumer.accept(result.mapLeft(throwable -> {
            if (throwable instanceof MessagingException) {
                return (MessagingException)throwable;
            }
            if (throwable instanceof SourceErrorException) {
                return ((SourceErrorException)throwable).toMessagingException(flowConstruct.getMuleContext().getExceptionContextProviders(), messageSource);
            }
            return null;
        })));
    }

    @Override
    public int compareTo(MessageProcessPhase messageProcessPhase) {
        if (messageProcessPhase instanceof ValidationPhase) {
            return 1;
        }
        return 0;
    }

    private static final class PhaseContext {
        final ModuleFlowProcessingPhaseTemplate template;
        final MessageProcessContext messageProcessContext;
        final PhaseResultNotifier phaseResultNotifier;
        final Consumer<Either<MessagingException, CoreEvent>> terminateConsumer;

        PhaseContext(ModuleFlowProcessingPhaseTemplate template, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier, Consumer<Either<MessagingException, CoreEvent>> terminateConsumer) {
            this.template = template;
            this.messageProcessContext = messageProcessContext;
            this.phaseResultNotifier = phaseResultNotifier;
            this.terminateConsumer = terminateConsumer;
        }
    }

    private class FlowProcessor
    extends AbstractComponent
    implements Processor {
        private final ModuleFlowProcessingPhaseTemplate template;
        private final CoreEvent templateEvent;
        private final FlowExceptionHandler messagingExceptionHandler;

        public FlowProcessor(ModuleFlowProcessingPhaseTemplate template, FlowExceptionHandler messagingExceptionHandler, CoreEvent templateEvent) {
            this.template = template;
            this.templateEvent = templateEvent;
            this.messagingExceptionHandler = messagingExceptionHandler;
        }

        @Override
        public CoreEvent process(CoreEvent event) throws MuleException {
            return MessageProcessors.processToApply(event, this);
        }

        @Override
        public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
            return Mono.from(publisher).flatMapMany(event -> MessageProcessors.processWithChildContext(event, p -> Mono.from(p).flatMapMany(e -> this.template.routeEventAsync((CoreEvent)e)).switchIfEmpty(Mono.fromCallable(() -> ModuleFlowProcessingPhase.this.emptyEvent(this.templateEvent))), Optional.empty(), this.messagingExceptionHandler));
        }
    }
}

