/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.privileged.processor.chain;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.mule.runtime.api.artifact.Registry;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.context.notification.MuleContextListener;
import org.mule.runtime.core.api.context.notification.ServerNotificationHandler;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.FlowExceptionHandler;
import org.mule.runtime.core.api.execution.ExceptionContextProvider;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.streaming.StreamingManager;
import org.mule.runtime.core.api.util.StreamingUtils;
import org.mule.runtime.core.internal.context.DefaultMuleContext;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.interception.InterceptorManager;
import org.mule.runtime.core.internal.interception.ReactiveInterceptor;
import org.mule.runtime.core.internal.processor.chain.InterceptedReactiveProcessor;
import org.mule.runtime.core.internal.processor.interceptor.ProcessorInterceptorFactoryAdapter;
import org.mule.runtime.core.internal.processor.interceptor.ReactiveInterceptorAdapter;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.internal.util.MessagingExceptionResolver;
import org.mule.runtime.core.internal.util.rx.RxUtils;
import org.mule.runtime.core.privileged.component.AbstractExecutableComponent;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.mule.runtime.core.privileged.exception.ErrorTypeLocator;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.ChainErrorHandlingUtils;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

abstract class AbstractMessageProcessorChain
extends AbstractExecutableComponent
implements MessageProcessorChain {
    private static final String TCCL_REACTOR_CTX_KEY = "mule.context.tccl";
    private static final String TCCL_ORIGINAL_REACTOR_CTX_KEY = "mule.context.tccl_original";
    private static final String REACTOR_ON_OPERATOR_ERROR_LOCAL = "reactor.onOperatorError.local";
    private static final String UNEXPECTED_ERROR_HANDLER_STATE_MESSAGE = "Unexpected state. Error handler should be invoked with either an Event instance or a MessagingException";
    private static Class<ClassLoader> appClClass;
    private static final Logger LOGGER;
    private static final Consumer<Context> TCCL_REACTOR_CTX_CONSUMER;
    private static final Consumer<Context> TCCL_ORIGINAL_REACTOR_CTX_CONSUMER;
    private final String name;
    private final List<Processor> processors;
    private final FlowExceptionHandler messagingExceptionHandler;
    private final ProcessingStrategy processingStrategy;
    private final List<ReactiveInterceptorAdapter> additionalInterceptors = new LinkedList<ReactiveInterceptorAdapter>();
    private boolean canProcessMessage = true;
    @Inject
    private ServerNotificationHandler serverNotificationHandler;
    @Inject
    private ErrorTypeLocator errorTypeLocator;
    @Inject
    private Collection<ExceptionContextProvider> exceptionContextProviders;
    @Inject
    private InterceptorManager processorInterceptorManager;
    @Inject
    private StreamingManager streamingManager;

    AbstractMessageProcessorChain(String name, Optional<ProcessingStrategy> processingStrategyOptional, List<Processor> processors, FlowExceptionHandler messagingExceptionHandler) {
        this.name = name;
        this.processingStrategy = processingStrategyOptional.orElse(null);
        this.processors = processors;
        this.messagingExceptionHandler = messagingExceptionHandler;
    }

    @Override
    public CoreEvent process(CoreEvent event) throws MuleException {
        return MessageProcessors.processToApply(event, this);
    }

    @Override
    public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
        List<ReactiveInterceptor> interceptors = this.resolveInterceptors();
        if (this.messagingExceptionHandler != null) {
            FluxSinkRecorder errorSwitchSinkSinkRef = new FluxSinkRecorder();
            return Mono.subscriberContext().flatMapMany(ctx -> {
                AtomicInteger inflightEvents = new AtomicInteger();
                Consumer<Exception> errorRouter = this.messagingExceptionHandler.router(pub -> Flux.from((Publisher)pub).subscriberContext(ctx), handled -> errorSwitchSinkSinkRef.next(Either.right(handled)), rethrown -> errorSwitchSinkSinkRef.next(Either.left((MessagingException)rethrown, CoreEvent.class)));
                Flux upstream = Flux.from(this.doApply(publisher, interceptors, (context, throwable) -> {
                    inflightEvents.incrementAndGet();
                    errorRouter.accept((Exception)throwable);
                }));
                return Flux.from(RxUtils.propagateCompletion(upstream, errorSwitchSinkSinkRef.flux(), pub -> Flux.from((Publisher)pub).map(event -> {
                    Either<MessagingException, CoreEvent> result = Either.right(MessagingException.class, event);
                    errorSwitchSinkSinkRef.next(result);
                    return result;
                }), inflightEvents, () -> {
                    errorSwitchSinkSinkRef.complete();
                    LifecycleUtils.disposeIfNeeded(errorRouter, LOGGER);
                }, t -> {
                    errorSwitchSinkSinkRef.error((Throwable)t);
                    LifecycleUtils.disposeIfNeeded(errorRouter, LOGGER);
                })).map(RxUtils.propagateErrorResponseMapper());
            });
        }
        return this.doApply(publisher, interceptors, (context, throwable) -> context.error((Throwable)throwable));
    }

    @Deprecated
    public Publisher<CoreEvent> doApply(Publisher<CoreEvent> publisher, BiConsumer<BaseEventContext, ? super Exception> errorBubbler) {
        return this.doApply(publisher, this.resolveInterceptors(), errorBubbler);
    }

    private Publisher<CoreEvent> doApply(Publisher<CoreEvent> publisher, List<ReactiveInterceptor> interceptors, BiConsumer<BaseEventContext, ? super Exception> errorBubbler) {
        Flux stream = Flux.from(publisher);
        for (Processor processor : this.getProcessorsToExecute()) {
            stream = stream.transform((Function)this.applyInterceptors(interceptors, processor)).subscriberContext(context -> context.put((Object)REACTOR_ON_OPERATOR_ERROR_LOCAL, ChainErrorHandlingUtils.getLocalOperatorErrorHook(processor, this.errorTypeLocator, this.exceptionContextProviders))).onErrorContinue(exception -> !(exception instanceof LifecycleException), this.getContinueStrategyErrorHandler(processor, errorBubbler));
        }
        stream = stream.subscriberContext(ctx -> {
            ClassLoader tccl = Thread.currentThread().getContextClassLoader();
            if (tccl == null || tccl.getParent() == null || appClClass == null || !appClClass.isAssignableFrom(tccl.getClass())) {
                return ctx;
            }
            return ctx.put((Object)TCCL_ORIGINAL_REACTOR_CTX_KEY, (Object)tccl).put((Object)TCCL_REACTOR_CTX_KEY, (Object)tccl.getParent());
        });
        return stream;
    }

    private BiConsumer<Throwable, Object> getContinueStrategyErrorHandler(Processor processor, BiConsumer<BaseEventContext, ? super Exception> errorBubbler) {
        MessagingExceptionResolver exceptionResolver = processor instanceof Component ? new MessagingExceptionResolver((Component)((Object)processor)) : null;
        Function<MessagingException, MessagingException> messagingExceptionMapper = ChainErrorHandlingUtils.resolveMessagingException(processor, e -> exceptionResolver.resolve((MessagingException)e, this.errorTypeLocator, this.exceptionContextProviders));
        return (throwable, object) -> {
            throwable = org.mule.runtime.core.api.rx.Exceptions.unwrap(throwable);
            if (object == null && !(throwable instanceof MessagingException)) {
                LOGGER.error(UNEXPECTED_ERROR_HANDLER_STATE_MESSAGE, throwable);
                throw new IllegalStateException(UNEXPECTED_ERROR_HANDLER_STATE_MESSAGE);
            }
            if (object != null && !(object instanceof CoreEvent) && throwable instanceof MessagingException) {
                this.notifyError(processor, (BaseEventContext)((MessagingException)throwable).getEvent().getContext(), (MessagingException)messagingExceptionMapper.apply((MessagingException)throwable), errorBubbler);
            } else {
                CoreEvent event = (CoreEvent)object;
                if (throwable instanceof MessagingException) {
                    this.notifyError(processor, (BaseEventContext)(event != null ? event.getContext() : ((MessagingException)throwable).getEvent().getContext()), (MessagingException)messagingExceptionMapper.apply((MessagingException)throwable), errorBubbler);
                } else {
                    this.notifyError(processor, (BaseEventContext)event.getContext(), ChainErrorHandlingUtils.resolveException(processor, event, throwable, this.errorTypeLocator, this.exceptionContextProviders, exceptionResolver), errorBubbler);
                }
            }
        };
    }

    private void notifyError(Processor processor, BaseEventContext context, MessagingException resolvedException, BiConsumer<BaseEventContext, ? super Exception> errorBubbler) {
        this.errorNotification(processor).andThen((? super T t) -> errorBubbler.accept(context, (Exception)t)).accept(resolvedException);
    }

    private ReactiveProcessor applyInterceptors(List<ReactiveInterceptor> interceptorsToBeExecuted, Processor processor) {
        ReactiveProcessor interceptorWrapperProcessorFunction = processor;
        for (ReactiveInterceptor interceptor : interceptorsToBeExecuted) {
            interceptorWrapperProcessorFunction = (ReactiveProcessor)interceptor.apply(processor, interceptorWrapperProcessorFunction);
        }
        return interceptorWrapperProcessorFunction;
    }

    private List<ReactiveInterceptor> resolveInterceptors() {
        ArrayList<ReactiveInterceptor> interceptors = new ArrayList<ReactiveInterceptor>();
        interceptors.add((processor, next) -> stream -> Flux.from((Publisher)stream).doOnNext(event -> {
            DefaultMuleContext.currentMuleContext.set(this.muleContext);
            PrivilegedEvent.setCurrentEvent((PrivilegedEvent)event);
        }).transform(this.doOnNextOrErrorWithContext(TCCL_REACTOR_CTX_CONSUMER).andThen(next).andThen(this.doOnNextOrErrorWithContext(TCCL_ORIGINAL_REACTOR_CTX_CONSUMER))));
        if (this.processingStrategy != null) {
            interceptors.add((processor, next) -> this.processingStrategy.onProcessor(new InterceptedReactiveProcessor((ReactiveProcessor)processor, (ReactiveProcessor)next)));
        }
        interceptors.addAll(this.additionalInterceptors);
        interceptors.add((processor, next) -> {
            String processorPath = processor instanceof Component && ((Component)((Object)processor)).getLocation() != null ? ((Component)((Object)processor)).getLocation().getLocation() : null;
            return stream -> Flux.from((Publisher)stream).doOnNext(event -> {
                if (!this.canProcessMessage) {
                    throw Exceptions.propagate((Throwable)new MessagingException((CoreEvent)event, (Throwable)new LifecycleException(CoreMessages.isStopped(this.name), (Object)event.getMessage())));
                }
                if (processorPath != null) {
                    MDC.put((String)"processorPath", (String)processorPath);
                }
                this.preNotification((CoreEvent)event, (Processor)processor);
            }).transform((Function)next).map(result -> {
                try {
                    this.postNotification((Processor)processor).accept((CoreEvent)result);
                    PrivilegedEvent.setCurrentEvent((PrivilegedEvent)result);
                    CoreEvent coreEvent = StreamingUtils.updateEventForStreaming(this.streamingManager).apply((CoreEvent)result);
                    return coreEvent;
                }
                finally {
                    if (processorPath != null) {
                        MDC.remove((String)"processorPath");
                    }
                }
            });
        });
        return interceptors;
    }

    private void registerStopListener() {
        if (this.muleContext instanceof DefaultMuleContext) {
            MuleContextListener listener = new MuleContextListener(){

                @Override
                public void onCreation(MuleContext context) {
                }

                @Override
                public void onInitialization(MuleContext context, Registry registry) {
                }

                @Override
                public void onStart(MuleContext context, Registry registry) {
                }

                @Override
                public void onStop(MuleContext context, Registry registry) {
                    AbstractMessageProcessorChain.this.canProcessMessage = false;
                    ((DefaultMuleContext)AbstractMessageProcessorChain.this.muleContext).removeListener(this);
                }
            };
            ((DefaultMuleContext)this.muleContext).addListener(listener);
        }
    }

    private Function<? super Publisher<CoreEvent>, ? extends Publisher<CoreEvent>> doOnNextOrErrorWithContext(final Consumer<Context> contextConsumer) {
        return Operators.lift((scannable, subscriber) -> new CoreSubscriber<CoreEvent>(){

            public void onNext(CoreEvent event) {
                contextConsumer.accept(this.currentContext());
                subscriber.onNext((Object)event);
            }

            public void onError(Throwable throwable) {
                contextConsumer.accept(this.currentContext());
                subscriber.onError(throwable);
            }

            public void onComplete() {
                subscriber.onComplete();
            }

            public Context currentContext() {
                return subscriber.currentContext();
            }

            public void onSubscribe(Subscription s) {
                subscriber.onSubscribe(s);
            }
        });
    }

    private void preNotification(CoreEvent event, Processor processor) {
        if (((PrivilegedEvent)event).isNotificationsEnabled()) {
            this.fireNotification(event, processor, null, 1601);
        }
    }

    private Consumer<CoreEvent> postNotification(Processor processor) {
        return event -> {
            if (((PrivilegedEvent)event).isNotificationsEnabled()) {
                this.fireNotification((CoreEvent)event, processor, null, 1602);
            }
        };
    }

    private Consumer<Exception> errorNotification(Processor processor) {
        return exception -> {
            if (exception instanceof MessagingException && ((PrivilegedEvent)((MessagingException)exception).getEvent()).isNotificationsEnabled()) {
                this.fireNotification(((MessagingException)exception).getEvent(), processor, (MessagingException)exception, 1602);
            }
        };
    }

    private void fireNotification(CoreEvent event, Processor processor, MessagingException exceptionThrown, int action) {
        if (this.serverNotificationHandler != null && processor instanceof Component && ((Component)((Object)processor)).getLocation() != null) {
            this.serverNotificationHandler.fireNotification(MessageProcessorNotification.createFrom(event, ((Component)((Object)processor)).getLocation(), (Component)((Object)processor), exceptionThrown, action));
        }
    }

    protected List<Processor> getProcessorsToExecute() {
        return this.processors;
    }

    public String toString() {
        StringBuilder string = new StringBuilder();
        string.append(this.getClass().getSimpleName());
        if (!org.mule.runtime.core.api.util.StringUtils.isBlank(this.name)) {
            string.append(String.format(" '%s' ", this.name));
        }
        Iterator<Processor> mpIterator = this.processors.iterator();
        String nl = String.format("%n", new Object[0]);
        if (mpIterator.hasNext()) {
            string.append(String.format("%n[ ", new Object[0]));
            while (mpIterator.hasNext()) {
                Processor mp = mpIterator.next();
                String indented = StringUtils.replace((String)mp.toString(), (String)nl, (String)String.format("%n  ", new Object[0]));
                string.append(String.format("%n  %s", indented));
                if (!mpIterator.hasNext()) continue;
                string.append(", ");
            }
            string.append(String.format("%n]", new Object[0]));
        }
        return string.toString();
    }

    @Override
    public List<Processor> getMessageProcessors() {
        return this.processors;
    }

    protected List<Processor> getMessageProcessorsForLifecycle() {
        return this.processors;
    }

    @Override
    public void setMuleContext(MuleContext muleContext) {
        super.setMuleContext(muleContext);
        LifecycleUtils.setMuleContextIfNeeded(this.getMessageProcessorsForLifecycle(), muleContext);
    }

    @Override
    public void initialise() throws InitialisationException {
        this.additionalInterceptors.addAll(ReactiveInterceptorAdapter.createInterceptors(this.processorInterceptorManager.getInterceptorFactories().stream().map(ProcessorInterceptorFactoryAdapter::new).collect(Collectors.toList()), this.muleContext.getInjector()));
        LifecycleUtils.initialiseIfNeeded(this.getMessageProcessorsForLifecycle(), this.muleContext);
    }

    @Override
    public void start() throws MuleException {
        ArrayList<Processor> startedProcessors = new ArrayList<Processor>();
        try {
            for (Processor processor : this.getMessageProcessorsForLifecycle()) {
                if (!(processor instanceof Startable)) continue;
                ((Startable)((Object)processor)).start();
                startedProcessors.add(processor);
            }
        }
        catch (MuleException e) {
            LifecycleUtils.stopIfNeeded(this.getMessageProcessorsForLifecycle());
            throw e;
        }
        this.registerStopListener();
        this.canProcessMessage = true;
    }

    @Override
    public void stop() throws MuleException {
        this.canProcessMessage = false;
        LifecycleUtils.stopIfNeeded(this.getMessageProcessorsForLifecycle());
    }

    @Override
    public void dispose() {
        LifecycleUtils.disposeIfNeeded(this.getMessageProcessorsForLifecycle(), LOGGER);
    }

    FlowExceptionHandler getMessagingExceptionHandler() {
        return this.messagingExceptionHandler;
    }

    static {
        LOGGER = LoggerFactory.getLogger(AbstractMessageProcessorChain.class);
        TCCL_REACTOR_CTX_CONSUMER = context -> context.getOrEmpty((Object)TCCL_REACTOR_CTX_KEY).ifPresent(cl -> Thread.currentThread().setContextClassLoader((ClassLoader)cl));
        TCCL_ORIGINAL_REACTOR_CTX_CONSUMER = context -> context.getOrEmpty((Object)TCCL_ORIGINAL_REACTOR_CTX_KEY).ifPresent(cl -> Thread.currentThread().setContextClassLoader((ClassLoader)cl));
        try {
            appClClass = AbstractMessageProcessorChain.class.getClassLoader().loadClass("org.mule.runtime.deployment.model.api.application.ApplicationClassLoader");
        }
        catch (ClassNotFoundException e) {
            LOGGER.debug("ApplicationClassLoader interface not available in current context", (Throwable)e);
        }
    }
}

