/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.construct;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.deployment.management.ComponentInitialStateManager;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.notification.EnrichedNotificationInfo;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.api.notification.PipelineMessageNotification;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.connector.ConnectException;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.core.api.exception.FlowExceptionHandler;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.management.stats.FlowConstructStatistics;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.AsyncProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.internal.construct.AbstractFlowConstruct;
import org.mule.runtime.core.internal.construct.BackPressureStrategySelector;
import org.mule.runtime.core.internal.construct.FlowBackPressureException;
import org.mule.runtime.core.internal.context.MuleContextWithRegistry;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.processor.strategy.DirectProcessingStrategyFactory;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.processor.MessageProcessorBuilder;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.DefaultMessageProcessorChainBuilder;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChainBuilder;
import org.mule.runtime.core.privileged.registry.RegistrationException;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class AbstractPipeline
extends AbstractFlowConstruct
implements Pipeline {
    private final NotificationDispatcher notificationFirer;
    private final MessageSource source;
    private final List<Processor> processors;
    private MessageProcessorChain pipeline;
    private final ProcessingStrategyFactory processingStrategyFactory;
    private final ProcessingStrategy processingStrategy;
    private volatile boolean canProcessMessage = false;
    private Sink sink;
    private final int maxConcurrency;
    private final ComponentInitialStateManager componentInitialStateManager;
    private final BackPressureStrategySelector backpressureStrategySelector;
    private final ErrorType FLOW_BACKPRESSURE_ERROR_TYPE;

    public AbstractPipeline(String name, MuleContext muleContext, MessageSource source, List<Processor> processors, Optional<FlowExceptionHandler> exceptionListener, Optional<ProcessingStrategyFactory> processingStrategyFactory, String initialState, Integer maxConcurrency, FlowConstructStatistics flowConstructStatistics, ComponentInitialStateManager componentInitialStateManager) {
        super(name, muleContext, exceptionListener, initialState, flowConstructStatistics);
        try {
            this.notificationFirer = ((MuleContextWithRegistry)muleContext).getRegistry().lookupObject(NotificationDispatcher.class);
        }
        catch (RegistrationException e) {
            throw new MuleRuntimeException(e);
        }
        this.source = source;
        this.componentInitialStateManager = componentInitialStateManager;
        this.processors = Collections.unmodifiableList(processors);
        this.maxConcurrency = maxConcurrency != null ? maxConcurrency : AsyncProcessingStrategyFactory.DEFAULT_MAX_CONCURRENCY;
        this.processingStrategyFactory = processingStrategyFactory.orElseGet(() -> this.defaultProcessingStrategy());
        if (this.processingStrategyFactory instanceof AsyncProcessingStrategyFactory) {
            ((AsyncProcessingStrategyFactory)this.processingStrategyFactory).setMaxConcurrency(this.maxConcurrency);
        } else if (maxConcurrency != null) {
            LOGGER.warn("{} does not support 'maxConcurrency'. Ignoring the value.", (Object)this.processingStrategyFactory.getClass().getSimpleName());
        }
        this.processingStrategy = this.processingStrategyFactory.create(muleContext, this.getName());
        this.backpressureStrategySelector = new BackPressureStrategySelector(this);
        this.FLOW_BACKPRESSURE_ERROR_TYPE = muleContext.getErrorTypeRepository().getErrorType(Errors.ComponentIdentifiers.Unhandleable.FLOW_BACK_PRESSURE).get();
    }

    protected MessageProcessorChain createPipeline() throws MuleException {
        DefaultMessageProcessorChainBuilder builder = new DefaultMessageProcessorChainBuilder();
        builder.setName("'" + this.getName() + "' processor chain");
        if (this.processingStrategy != null) {
            builder.setProcessingStrategy(this.processingStrategy);
        }
        this.configureMessageProcessors(builder);
        return builder.build();
    }

    protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
        return new DirectProcessingStrategyFactory();
    }

    private ProcessingStrategyFactory defaultProcessingStrategy() {
        ProcessingStrategyFactory defaultProcessingStrategyFactory = this.getMuleContext().getConfiguration().getDefaultProcessingStrategyFactory();
        if (defaultProcessingStrategyFactory == null) {
            return this.createDefaultProcessingStrategyFactory();
        }
        return defaultProcessingStrategyFactory;
    }

    @Override
    public List<Processor> getProcessors() {
        return this.processors;
    }

    @Override
    public MessageSource getSource() {
        return this.source;
    }

    protected MessageProcessorChain getPipeline() {
        return this.pipeline;
    }

    @Override
    public boolean isSynchronous() {
        return this.processingStrategy.isSynchronous();
    }

    @Override
    public ProcessingStrategy getProcessingStrategy() {
        return this.processingStrategy;
    }

    @Override
    protected void doInitialise() throws MuleException {
        super.doInitialise();
        this.pipeline = this.createPipeline();
        if (this.source != null) {
            this.source.setListener(new Processor(){

                @Override
                public CoreEvent process(CoreEvent event) throws MuleException {
                    return MessageProcessors.processToApply(event, this);
                }

                @Override
                public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
                    return Flux.from(publisher).transform((Function)AbstractPipeline.this.dispatchToFlow());
                }
            });
        }
        LifecycleUtils.initialiseIfNeeded(this.source, this.muleContext);
        LifecycleUtils.initialiseIfNeeded(this.pipeline, this.muleContext);
    }

    private ReactiveProcessor dispatchToFlow() {
        return publisher -> Flux.from((Publisher)publisher).doOnNext(this.assertStarted()).flatMap(this.routeThroughProcessingStrategy());
    }

    protected Function<CoreEvent, Publisher<? extends CoreEvent>> routeThroughProcessingStrategy() {
        return event -> {
            Publisher<CoreEvent> responsePublisher = ((BaseEventContext)event.getContext()).getResponsePublisher();
            try {
                if (this.getSource() == null || this.getSource().getBackPressureStrategy() == MessageSource.BackPressureStrategy.WAIT) {
                    this.sink.accept((CoreEvent)event);
                } else if (!this.sink.emit((CoreEvent)event)) {
                    this.notifyBackpressureException((CoreEvent)event, new FlowBackPressureException(this.getName()));
                }
            }
            catch (RejectedExecutionException e) {
                FlowBackPressureException wrappedException = new FlowBackPressureException(this.getName(), (Throwable)e);
                this.notifyBackpressureException((CoreEvent)event, wrappedException);
            }
            return Mono.from(responsePublisher);
        };
    }

    private void notifyBackpressureException(CoreEvent event, FlowBackPressureException wrappedException) {
        CoreEvent errorEvent = CoreEvent.builder(event).error(ErrorBuilder.builder(wrappedException).errorType(this.FLOW_BACKPRESSURE_ERROR_TYPE).build()).build();
        ((BaseEventContext)event.getContext()).error(new MessagingException(errorEvent, (Throwable)wrappedException));
    }

    protected ReactiveProcessor processFlowFunction() {
        return stream -> Flux.from((Publisher)stream).doOnNext(this.beforeProcessors()).transform((Function)this.processingStrategy.onPipeline(this.pipeline)).doOnNext(this.afterProcessors()).doOnError(throwable -> {
            if (this.isCompleteSignalRejectedExecutionException((Throwable)throwable)) {
                LOGGER.debug("Scheduler busy when propagating 'complete' signal due to graceful shutdown timeout being exceeded.", throwable);
            } else {
                LOGGER.error("Unhandled exception in Flow ", throwable);
            }
        });
    }

    boolean isCompleteSignalRejectedExecutionException(Throwable throwable) {
        if (throwable instanceof RejectedExecutionException) {
            for (StackTraceElement element : throwable.getStackTrace()) {
                if (!element.getMethodName().contains("onComplete") || !element.getClassName().startsWith("reactor.core.publisher.FluxPublishOn")) continue;
                return true;
            }
        }
        return false;
    }

    private Consumer<CoreEvent> beforeProcessors() {
        return event -> {
            if (this.getStatistics().isEnabled()) {
                this.getStatistics().incReceivedEvents();
            }
            this.notificationFirer.dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(event, null, this), this.getName(), 1801));
            long startTime = System.currentTimeMillis();
            BaseEventContext baseEventContext = (BaseEventContext)event.getContext();
            baseEventContext.onComplete((response, throwable) -> {
                MessagingException messagingException = null;
                if (throwable != null) {
                    messagingException = throwable instanceof MessagingException ? (MessagingException)throwable : new MessagingException((CoreEvent)event, (Throwable)throwable, (Component)this);
                    response = messagingException.getEvent();
                }
                this.fireCompleteNotification((CoreEvent)response, messagingException);
                baseEventContext.getProcessingTime().ifPresent(time -> time.addFlowExecutionBranchTime(startTime));
            });
        };
    }

    private void fireCompleteNotification(CoreEvent event, MessagingException messagingException) {
        this.notificationFirer.dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(event, messagingException, this), this.getName(), 1804));
    }

    private Consumer<CoreEvent> afterProcessors() {
        return response -> {
            this.notificationFirer.dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(response, null, this), this.getName(), 1802));
            ((BaseEventContext)response.getContext()).success((CoreEvent)response);
        };
    }

    protected void configureMessageProcessors(MessageProcessorChainBuilder builder) throws MuleException {
        for (Processor processor : this.getProcessors()) {
            if (processor instanceof Processor) {
                builder.chain(processor);
                continue;
            }
            if (processor instanceof MessageProcessorBuilder) {
                builder.chain((MessageProcessorBuilder)((Object)processor));
                continue;
            }
            throw new IllegalArgumentException("MessageProcessorBuilder should only have MessageProcessor's or MessageProcessorBuilder's configured");
        }
    }

    @Override
    protected void doStartProcessingStrategy() throws MuleException {
        super.doStartProcessingStrategy();
        this.startIfStartable(this.processingStrategy);
    }

    @Override
    protected void doStart() throws MuleException {
        super.doStart();
        this.sink = this.processingStrategy.createSink(this, this.processFlowFunction());
        try {
            this.startIfStartable(this.pipeline);
        }
        catch (Exception e) {
            this.stopOnFailure(e);
            return;
        }
        this.canProcessMessage = true;
        if (this.getMuleContext().isStarted()) {
            try {
                if (this.componentInitialStateManager.mustStartMessageSource(this.source)) {
                    this.startIfStartable(this.source);
                }
            }
            catch (ConnectException ce) {
                throw ce;
            }
            catch (Exception e) {
                this.stopOnFailure(e);
            }
        }
    }

    private void stopOnFailure(Exception e) throws MuleException {
        this.stopSafely(this::doStop);
        this.stopSafely(this::doStopProcessingStrategy);
        if (e instanceof MuleException) {
            throw (MuleException)e;
        }
        throw new DefaultMuleException(e);
    }

    private void stopSafely(CheckedRunnable task) {
        try {
            task.run();
        }
        catch (Exception e) {
            LOGGER.warn(String.format("Stopping pipeline '%s' due to error on starting, but another exception was also found while shutting down: %s", this.getName(), e.getMessage()), (Throwable)e);
        }
    }

    public Consumer<CoreEvent> assertStarted() {
        return event -> {
            if (!this.canProcessMessage) {
                throw Exceptions.propagate((Throwable)new MessagingException((CoreEvent)event, (Throwable)new LifecycleException(CoreMessages.isStopped(this.getName()), (Object)event.getMessage())));
            }
        };
    }

    @Override
    protected void doStop() throws MuleException {
        this.stopSafely(() -> this.stopIfStoppable(this.source));
        this.canProcessMessage = false;
        this.stopSafely(() -> this.disposeIfDisposable(this.sink));
        this.sink = null;
        this.stopIfStoppable(this.pipeline);
        super.doStop();
    }

    @Override
    protected void doStopProcessingStrategy() throws MuleException {
        this.stopIfStoppable(this.processingStrategy);
        super.doStopProcessingStrategy();
    }

    @Override
    protected void doDispose() {
        this.disposeIfDisposable(this.pipeline);
        this.disposeIfDisposable(this.source);
        super.doDispose();
    }

    protected Sink getSink() {
        return this.sink;
    }

    @Override
    public int getMaxConcurrency() {
        return this.maxConcurrency;
    }

    @Override
    public ProcessingStrategyFactory getProcessingStrategyFactory() {
        return this.processingStrategyFactory;
    }

    @Override
    public void checkBackpressure(CoreEvent event) throws RuntimeException {
        try {
            this.backpressureStrategySelector.check(event);
        }
        catch (FlowBackPressureException e) {
            throw Exceptions.propagate((Throwable)e);
        }
    }
}

