/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.execution;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
import javax.xml.namespace.QName;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.execution.CompletableCallback;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.ErrorTypeRepository;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.interception.SourceInterceptorFactory;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.notification.ConnectorMessageNotification;
import org.mule.runtime.api.notification.Notification;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.context.notification.NotificationHelper;
import org.mule.runtime.core.api.context.notification.ServerNotificationManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextFactory;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.core.api.execution.ExceptionContextProvider;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.construct.AbstractFlowConstruct;
import org.mule.runtime.core.internal.construct.FlowBackPressureException;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.execution.FlowProcessTemplate;
import org.mule.runtime.core.internal.execution.MessageProcessContext;
import org.mule.runtime.core.internal.execution.PhaseResultNotifier;
import org.mule.runtime.core.internal.execution.SourceErrorException;
import org.mule.runtime.core.internal.execution.SourceResultAdapter;
import org.mule.runtime.core.internal.interception.InterceptorManager;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.policy.PolicyManager;
import org.mule.runtime.core.internal.policy.SourcePolicy;
import org.mule.runtime.core.internal.policy.SourcePolicyFailureResult;
import org.mule.runtime.core.internal.policy.SourcePolicySuccessResult;
import org.mule.runtime.core.internal.processor.interceptor.CompletableInterceptorSourceCallbackAdapter;
import org.mule.runtime.core.internal.util.FunctionalUtils;
import org.mule.runtime.core.internal.util.InternalExceptionUtils;
import org.mule.runtime.core.internal.util.mediatype.MediaTypeDecoratedResultCollection;
import org.mule.runtime.core.internal.util.message.MessageUtils;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.mule.runtime.core.privileged.exception.ErrorTypeLocator;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

public class FlowProcessMediator
implements Initialisable {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlowProcessMediator.class);
    @Inject
    private InterceptorManager processorInterceptorManager;
    @Inject
    private ErrorTypeRepository errorTypeRepository;
    @Inject
    private ErrorTypeLocator errorTypeLocator;
    @Inject
    private Collection<ExceptionContextProvider> exceptionContextProviders;
    @Inject
    private ServerNotificationManager notificationManager;
    @Inject
    private MuleContext muleContext;
    private final PolicyManager policyManager;
    private final List<CompletableInterceptorSourceCallbackAdapter> additionalInterceptors = new LinkedList<CompletableInterceptorSourceCallbackAdapter>();
    private ErrorType sourceResponseGenerateErrorType;
    private ErrorType sourceResponseSendErrorType;
    private ErrorType sourceErrorResponseGenerateErrorType;
    private ErrorType sourceErrorResponseSendErrorType;
    private ErrorType flowBackPressureErrorType;
    private NotificationHelper notificationHelper;

    public FlowProcessMediator(PolicyManager policyManager) {
        this.policyManager = policyManager;
    }

    @Override
    public void initialise() throws InitialisationException {
        this.notificationHelper = new NotificationHelper(this.notificationManager, ConnectorMessageNotification.class, false);
        this.sourceResponseGenerateErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_GENERATE).get();
        this.sourceResponseSendErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_SEND).get();
        this.sourceErrorResponseGenerateErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_GENERATE).get();
        this.sourceErrorResponseSendErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_SEND).get();
        this.flowBackPressureErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Unhandleable.FLOW_BACK_PRESSURE).get();
        if (this.processorInterceptorManager != null) {
            this.processorInterceptorManager.getSourceInterceptorFactories().stream().forEach(interceptorFactory -> {
                CompletableInterceptorSourceCallbackAdapter reactiveInterceptorAdapter = new CompletableInterceptorSourceCallbackAdapter((SourceInterceptorFactory)interceptorFactory);
                try {
                    this.muleContext.getInjector().inject(reactiveInterceptorAdapter);
                }
                catch (MuleException e) {
                    throw new MuleRuntimeException(e);
                }
                this.additionalInterceptors.add(0, reactiveInterceptorAdapter);
            });
        }
    }

    public void process(FlowProcessTemplate template, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier) {
        try {
            MessageSource messageSource = messageProcessContext.getMessageSource();
            AbstractFlowConstruct flowConstruct = (AbstractFlowConstruct)messageProcessContext.getFlowConstruct();
            CompletableFuture<Void> responseCompletion = new CompletableFuture<Void>();
            FlowProcessor flowExecutionProcessor = new FlowProcessor(template, flowConstruct);
            CoreEvent event = this.createEvent(template, messageSource, responseCompletion, flowConstruct);
            this.policyManager.addSourcePointcutParametersIntoEvent(messageSource, event.getMessage().getAttributes(), (InternalEvent)event);
            try {
                SourcePolicy policy = this.policyManager.createSourcePolicyInstance(messageSource, event, flowExecutionProcessor, template);
                PhaseContext phaseContext = new PhaseContext(template, messageSource, messageProcessContext, phaseResultNotifier, flowConstruct, this.getTerminateConsumer(messageSource, template), responseCompletion, event, policy);
                this.dispatch(phaseContext);
            }
            catch (Exception e) {
                template.sendFailureResponseToClient(messageProcessContext.getMessagingExceptionResolver().resolve(new MessagingException(event, (Throwable)e), this.errorTypeLocator, this.exceptionContextProviders), template.getFailedExecutionResponseParametersFunction().apply(event), CompletableCallback.always(() -> phaseResultNotifier.phaseFailure(e)));
                ((BaseEventContext)event.getContext()).error(e);
                responseCompletion.complete(null);
            }
        }
        catch (Exception e) {
            phaseResultNotifier.phaseFailure(e);
        }
    }

    private void dispatch(final PhaseContext ctx) throws Exception {
        try {
            this.onMessageReceived(ctx);
            ctx.flowConstruct.checkBackpressure(ctx.event);
            ctx.template.getNotificationFunctions().forEach(notificationFunction -> this.notificationManager.fireNotification((Notification)notificationFunction.apply(ctx.event, ctx.messageProcessContext.getMessageSource())));
            ctx.sourcePolicy.process(ctx.event, ctx.template, new CompletableCallback<Either<SourcePolicyFailureResult, SourcePolicySuccessResult>>(){

                @Override
                public void complete(Either<SourcePolicyFailureResult, SourcePolicySuccessResult> value) {
                    ctx.result = value;
                    FlowProcessMediator.this.dispatchResponse(ctx);
                }

                @Override
                public void error(Throwable e) {
                    ctx.result = Either.left(new SourcePolicyFailureResult(new MessagingException(ctx.event, e), () -> Collections.emptyMap()));
                    FlowProcessMediator.this.dispatchResponse(ctx);
                }
            });
        }
        catch (Exception e) {
            e = (Exception)Exceptions.unwrap(e);
            if (e instanceof FlowBackPressureException) {
                ((BaseEventContext)ctx.event.getContext()).error(e);
                ctx.result = this.mapBackPressureExceptionToPolicyFailureResult(ctx.template, ctx.event, (FlowBackPressureException)e);
                this.dispatchResponse(ctx);
            }
            throw e;
        }
    }

    private void dispatchResponse(PhaseContext ctx) {
        ctx.result.apply(this.policyFailure(ctx), this.policySuccess(ctx));
    }

    private void finish(PhaseContext ctx) {
        try {
            if (ctx.exception != null) {
                this.onFailure(ctx).accept(ctx.exception);
            } else {
                ctx.phaseResultNotifier.phaseSuccessfully();
            }
        }
        finally {
            ctx.responseCompletion.complete(null);
        }
    }

    protected Either<SourcePolicyFailureResult, SourcePolicySuccessResult> mapBackPressureExceptionToPolicyFailureResult(FlowProcessTemplate template, CoreEvent event, FlowBackPressureException exception) {
        CoreEvent errorEvent = CoreEvent.builder(event).error(ErrorBuilder.builder(exception).errorType(this.flowBackPressureErrorType).build()).build();
        SourcePolicyFailureResult result = new SourcePolicyFailureResult(new MessagingException(errorEvent, (Throwable)exception), () -> template.getFailedExecutionResponseParametersFunction().apply(errorEvent));
        return Either.left(result);
    }

    private Consumer<SourcePolicySuccessResult> policySuccess(final PhaseContext ctx) {
        return successResult -> {
            this.fireNotification(ctx.messageProcessContext.getMessageSource(), successResult.getResult(), ctx.flowConstruct, 805);
            BiConsumer<SourcePolicySuccessResult, CompletableCallback<Void>> sendResponseToClient = (result, responseCallback) -> ctx.template.sendResponseToClient(result.getResult(), result.getResponseParameters().get(), (CompletableCallback<Void>)responseCallback);
            CompletableCallback<Void> responseCallback2 = new CompletableCallback<Void>(){

                @Override
                public void complete(Void value) {
                    FlowProcessMediator.this.onTerminate(ctx, Either.right(successResult.getResult()));
                    FlowProcessMediator.this.finish(ctx);
                }

                @Override
                public void error(Throwable e) {
                    FlowProcessMediator.this.policySuccessError(new SourceErrorException(successResult.getResult(), FlowProcessMediator.this.sourceResponseSendErrorType, e), successResult, ctx);
                }
            };
            try {
                for (CompletableInterceptorSourceCallbackAdapter interceptor : this.additionalInterceptors) {
                    sendResponseToClient = interceptor.apply(ctx.messageSource, sendResponseToClient);
                }
                sendResponseToClient.accept((SourcePolicySuccessResult)successResult, responseCallback2);
            }
            catch (Exception e) {
                this.policySuccessError(new SourceErrorException(successResult.getResult(), this.sourceResponseGenerateErrorType, e), (SourcePolicySuccessResult)successResult, ctx);
            }
        };
    }

    private Consumer<SourcePolicyFailureResult> policyFailure(final PhaseContext ctx) {
        return failureResult -> {
            this.fireNotification(ctx.messageProcessContext.getMessageSource(), failureResult.getMessagingException().getEvent(), ctx.flowConstruct, 806);
            this.sendErrorResponse(failureResult.getMessagingException(), event -> failureResult.getErrorResponseParameters().get(), ctx, new CompletableCallback<Void>(){

                @Override
                public void complete(Void value) {
                    FlowProcessMediator.this.onTerminate(ctx, Either.left(failureResult.getMessagingException()));
                    FlowProcessMediator.this.finish(ctx);
                }

                @Override
                public void error(Throwable e) {
                    ctx.exception = e;
                    FlowProcessMediator.this.finish(ctx);
                }
            });
        };
    }

    private void policySuccessError(SourceErrorException see, SourcePolicySuccessResult successResult, final PhaseContext ctx) {
        MessagingException messagingException = see.toMessagingException(this.exceptionContextProviders, ctx.messageSource);
        Consumer<MessagingException> terminationCallback = me -> this.sendErrorResponse((MessagingException)me, successResult.createErrorResponseParameters(), ctx, new CompletableCallback<Void>(){

            @Override
            public void complete(Void value) {
                FlowProcessMediator.this.onTerminate(ctx, Either.left(me));
                FlowProcessMediator.this.finish(ctx);
            }

            @Override
            public void error(Throwable e) {
                ctx.exception = e;
                FlowProcessMediator.this.finish(ctx);
            }
        });
        ctx.flowConstruct.getExceptionListener().router(Function.identity(), event -> terminationCallback.accept(messagingException), error -> terminationCallback.accept(messagingException)).accept(messagingException);
    }

    private void sendErrorResponse(MessagingException messagingException, Function<CoreEvent, Map<String, Object>> errorParameters, PhaseContext ctx, final CompletableCallback<Void> callback) {
        final CoreEvent event = messagingException.getEvent();
        try {
            ctx.template.sendFailureResponseToClient(messagingException, errorParameters.apply(event), new CompletableCallback<Void>(){

                @Override
                public void complete(Void value) {
                    callback.complete(value);
                }

                @Override
                public void error(Throwable e) {
                    callback.error(new SourceErrorException(CoreEvent.builder(event).error(ErrorBuilder.builder(e).errorType(FlowProcessMediator.this.sourceErrorResponseSendErrorType).build()).build(), FlowProcessMediator.this.sourceErrorResponseSendErrorType, e));
                }
            });
        }
        catch (Exception e) {
            callback.error(new SourceErrorException(event, this.sourceErrorResponseGenerateErrorType, e, messagingException));
        }
    }

    private Consumer<Throwable> onFailure(PhaseContext ctx) {
        return throwable -> {
            this.onTerminate(ctx, Either.left(throwable));
            throwable = throwable instanceof SourceErrorException ? throwable.getCause() : throwable;
            Exception failureException = throwable instanceof Exception ? (Exception)throwable : new DefaultMuleException((Throwable)throwable);
            ctx.phaseResultNotifier.phaseFailure(failureException);
        };
    }

    private Consumer<Either<MessagingException, CoreEvent>> getTerminateConsumer(MessageSource messageSource, FlowProcessTemplate template) {
        return eventOrException -> template.afterPhaseExecution(eventOrException.mapLeft(messagingException -> {
            messagingException.setProcessedEvent(InternalExceptionUtils.createErrorEvent(messagingException.getEvent(), messageSource, messagingException, this.errorTypeLocator));
            return messagingException;
        }));
    }

    private void onMessageReceived(PhaseContext ctx) {
        this.fireNotification(ctx.messageProcessContext.getMessageSource(), ctx.event, ctx.flowConstruct, 801);
        ctx.template.getNotificationFunctions().forEach(notificationFunction -> this.notificationManager.fireNotification((Notification)notificationFunction.apply(ctx.event, ctx.messageProcessContext.getMessageSource())));
    }

    private CoreEvent createEvent(FlowProcessTemplate template, MessageSource source, CompletableFuture<Void> responseCompletion, FlowConstruct flowConstruct) {
        SourceResultAdapter adapter = template.getSourceMessage();
        InternalEvent.Builder eventBuilder = this.createEventBuilder(source.getLocation(), responseCompletion, flowConstruct, adapter.getCorrelationId().orElse(null));
        return eventBuilder.message(eventCtx -> {
            Result result = adapter.getResult();
            Object resultValue = result.getOutput();
            Message eventMessage = resultValue instanceof Collection && adapter.isCollection() ? MessageUtils.toMessage(Result.builder().output(MessageUtils.toMessageCollection(new MediaTypeDecoratedResultCollection((Collection)resultValue, adapter.getPayloadMediaTypeResolver()), adapter.getCursorProviderFactory(), ((BaseEventContext)eventCtx).getRootContext())).mediaType(result.getMediaType().orElse(MediaType.ANY)).build()) : MessageUtils.toMessage(result, adapter.getMediaType(), adapter.getCursorProviderFactory(), ((BaseEventContext)eventCtx).getRootContext());
            return eventMessage;
        }).build();
    }

    private InternalEvent.Builder createEventBuilder(ComponentLocation sourceLocation, CompletableFuture<Void> responseCompletion, FlowConstruct flowConstruct, String correlationId) {
        return InternalEvent.builder(EventContextFactory.create(flowConstruct, sourceLocation, correlationId, Optional.of(responseCompletion)));
    }

    private void onTerminate(PhaseContext ctx, Either<Throwable, CoreEvent> result) {
        FunctionalUtils.safely(result.mapLeft(throwable -> {
            if (throwable instanceof MessagingException) {
                return (MessagingException)throwable;
            }
            if (throwable instanceof SourceErrorException) {
                return ((SourceErrorException)throwable).toMessagingException(this.exceptionContextProviders, ctx.messageSource);
            }
            return null;
        }), mapped -> ctx.terminateConsumer.accept(mapped), e -> {});
    }

    private void fireNotification(Component source, CoreEvent event, FlowConstruct flow, int action) {
        block3: {
            try {
                if (event == null && (event = PrivilegedEvent.getCurrentEvent()) == null) {
                    return;
                }
                this.notificationHelper.fireNotification(source, event, flow.getLocation(), action);
            }
            catch (Exception e) {
                if (!LOGGER.isWarnEnabled()) break block3;
                LOGGER.warn("Could not fire notification. Action: " + action, (Throwable)e);
            }
        }
    }

    public void setMuleContext(MuleContext context) {
        this.muleContext = context;
    }

    private static final class PhaseContext {
        private final FlowProcessTemplate template;
        private final MessageSource messageSource;
        private final MessageProcessContext messageProcessContext;
        private final PhaseResultNotifier phaseResultNotifier;
        private final AbstractFlowConstruct flowConstruct;
        private final Consumer<Either<MessagingException, CoreEvent>> terminateConsumer;
        private final CompletableFuture<Void> responseCompletion;
        private final SourcePolicy sourcePolicy;
        private final CoreEvent event;
        private Either<SourcePolicyFailureResult, SourcePolicySuccessResult> result;
        private Throwable exception;

        private PhaseContext(FlowProcessTemplate template, MessageSource messageSource, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier, AbstractFlowConstruct flowConstruct, Consumer<Either<MessagingException, CoreEvent>> terminateConsumer, CompletableFuture<Void> responseCompletion, CoreEvent event, SourcePolicy sourcePolicy) {
            this.template = template;
            this.messageSource = messageSource;
            this.messageProcessContext = messageProcessContext;
            this.phaseResultNotifier = phaseResultNotifier;
            this.flowConstruct = flowConstruct;
            this.terminateConsumer = terminateConsumer;
            this.responseCompletion = responseCompletion;
            this.event = event;
            this.sourcePolicy = sourcePolicy;
        }
    }

    private class FlowProcessor
    implements Processor,
    Component {
        private final FlowProcessTemplate template;
        private final FlowConstruct flowConstruct;

        public FlowProcessor(FlowProcessTemplate template, FlowConstruct flowConstruct) {
            this.template = template;
            this.flowConstruct = flowConstruct;
        }

        @Override
        public CoreEvent process(CoreEvent event) throws MuleException {
            return MessageProcessors.processToApply(event, this);
        }

        @Override
        public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
            return MessageProcessors.applyWithChildContext((Publisher<CoreEvent>)Flux.from(publisher), this.template::routeEventAsync, Optional.empty());
        }

        @Override
        public Object getAnnotation(QName name) {
            return this.flowConstruct.getAnnotation(name);
        }

        @Override
        public Map<QName, Object> getAnnotations() {
            return this.flowConstruct.getAnnotations();
        }

        @Override
        public void setAnnotations(Map<QName, Object> annotations) {
            throw new UnsupportedOperationException();
        }

        @Override
        public ComponentLocation getLocation() {
            return this.flowConstruct.getLocation();
        }

        @Override
        public Location getRootContainerLocation() {
            return this.flowConstruct.getRootContainerLocation();
        }
    }
}

