/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.construct;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.meta.AbstractAnnotatedObject;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.InternalEventContext;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.connector.ConnectException;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.context.notification.EnrichedNotificationInfo;
import org.mule.runtime.core.api.context.notification.NotificationDispatcher;
import org.mule.runtime.core.api.context.notification.PipelineMessageNotification;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.management.stats.FlowConstructStatistics;
import org.mule.runtime.core.api.processor.InternalMessageProcessor;
import org.mule.runtime.core.api.processor.MessageProcessorBuilder;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessorChainBuilder;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.AsyncProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.DirectProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.registry.RegistrationException;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.internal.construct.AbstractFlowConstruct;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.privileged.processor.IdempotentRedeliveryPolicy;
import org.mule.runtime.core.privileged.processor.chain.DefaultMessageProcessorChainBuilder;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class AbstractPipeline
extends AbstractFlowConstruct
implements Pipeline {
    private final MessageSource source;
    private final List<Processor> processors;
    private MessageProcessorChain pipeline;
    private final ProcessingStrategy processingStrategy;
    private volatile boolean canProcessMessage = false;
    private final Cache<String, InternalEventContext> eventContextCache = CacheBuilder.newBuilder().weakValues().build();
    private volatile Sink sink;
    private final int maxConcurrency;

    public AbstractPipeline(String name, MuleContext muleContext, MessageSource source, List<Processor> processors, Optional<MessagingExceptionHandler> exceptionListener, Optional<ProcessingStrategyFactory> processingStrategyFactory, String initialState, int maxConcurrency, FlowConstructStatistics flowConstructStatistics) {
        super(name, muleContext, exceptionListener, initialState, flowConstructStatistics);
        this.source = source;
        this.processors = Collections.unmodifiableList(processors);
        this.maxConcurrency = maxConcurrency;
        ProcessingStrategyFactory psFactory = processingStrategyFactory.orElseGet(() -> this.defaultProcessingStrategy());
        if (psFactory instanceof AsyncProcessingStrategyFactory) {
            ((AsyncProcessingStrategyFactory)psFactory).setMaxConcurrency(maxConcurrency);
        }
        this.processingStrategy = psFactory.create(muleContext, this.getName());
    }

    protected MessageProcessorChain createPipeline() throws MuleException {
        DefaultMessageProcessorChainBuilder builder = new DefaultMessageProcessorChainBuilder();
        builder.setName("'" + this.getName() + "' processor chain");
        if (this.processingStrategy != null) {
            builder.setProcessingStrategy(this.processingStrategy);
        }
        this.configurePreProcessors(builder);
        this.configureMessageProcessors(builder);
        this.configurePostProcessors(builder);
        return builder.build();
    }

    protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
        return new DirectProcessingStrategyFactory();
    }

    private ProcessingStrategyFactory defaultProcessingStrategy() {
        ProcessingStrategyFactory defaultProcessingStrategyFactory = this.getMuleContext().getConfiguration().getDefaultProcessingStrategyFactory();
        if (defaultProcessingStrategyFactory == null) {
            return this.createDefaultProcessingStrategyFactory();
        }
        return defaultProcessingStrategyFactory;
    }

    protected void configurePreProcessors(MessageProcessorChainBuilder builder) throws MuleException {
        builder.chain(new ProcessorStartCompleteProcessor());
    }

    protected void configurePostProcessors(MessageProcessorChainBuilder builder) throws MuleException {
        builder.chain(new ProcessEndProcessor());
    }

    @Override
    public List<Processor> getProcessors() {
        return this.processors;
    }

    @Override
    public MessageSource getSource() {
        return this.source;
    }

    protected MessageProcessorChain getPipeline() {
        return this.pipeline;
    }

    @Override
    public boolean isSynchronous() {
        return this.processingStrategy.isSynchronous();
    }

    @Override
    public ProcessingStrategy getProcessingStrategy() {
        return this.processingStrategy;
    }

    @Override
    protected void doInitialise() throws MuleException {
        super.doInitialise();
        this.pipeline = this.createPipeline();
        if (this.source != null) {
            this.source.setListener(new Processor(){

                @Override
                public InternalEvent process(InternalEvent event) throws MuleException {
                    return MessageProcessors.processToApply(event, this);
                }

                @Override
                public Publisher<InternalEvent> apply(Publisher<InternalEvent> publisher) {
                    return Flux.from(publisher).doOnNext(AbstractPipeline.this.assertStarted()).handle((event, handleSink) -> {
                        try {
                            AbstractPipeline.this.getSink().accept((InternalEvent)event);
                            handleSink.next(event);
                        }
                        catch (RejectedExecutionException ree) {
                            event.getContext().error(ExceptionUtils.updateMessagingExceptionWithError(new MessagingException((InternalEvent)event, ree, (Processor)this), this, AbstractPipeline.this.getMuleContext()));
                        }
                    }).flatMap(event -> Mono.from(event.getContext().getResponsePublisher()));
                }
            });
        }
        this.injectFlowConstructMuleContext(this.source);
        this.injectFlowConstructMuleContext(this.pipeline);
        this.initialiseIfInitialisable(this.source);
        this.initialiseIfInitialisable(this.pipeline);
    }

    protected ReactiveProcessor processFlowFunction() {
        return stream -> Flux.from((Publisher)stream).transform((Function)this.processingStrategy.onPipeline(this.pipeline)).doOnNext(response -> response.getContext().success((InternalEvent)response)).doOnError(throwable -> LOGGER.error("Unhandled exception in Flow ", throwable));
    }

    protected void configureMessageProcessors(MessageProcessorChainBuilder builder) throws MuleException {
        for (Processor processor : this.getProcessors()) {
            if (processor instanceof Processor) {
                builder.chain(processor);
                continue;
            }
            if (processor instanceof MessageProcessorBuilder) {
                builder.chain((MessageProcessorBuilder)((Object)processor));
                continue;
            }
            throw new IllegalArgumentException("MessageProcessorBuilder should only have MessageProcessor's or MessageProcessorBuilder's configured");
        }
    }

    protected boolean isRedeliveryPolicyConfigured() {
        if (this.getProcessors().isEmpty()) {
            return false;
        }
        return this.getProcessors().get(0) instanceof IdempotentRedeliveryPolicy;
    }

    @Override
    protected void doStart() throws MuleException {
        super.doStart();
        this.startIfStartable(this.processingStrategy);
        this.sink = this.processingStrategy.createSink(this, this.processFlowFunction());
        this.startIfStartable(this.pipeline);
        this.canProcessMessage = true;
        if (this.getMuleContext().isStarted()) {
            try {
                this.startIfStartable(this.source);
            }
            catch (ConnectException ce) {
                throw ce;
            }
            catch (MuleException e) {
                this.doStop();
                throw e;
            }
        }
    }

    public Consumer<InternalEvent> assertStarted() {
        return event -> {
            if (!this.canProcessMessage) {
                throw Exceptions.propagate((Throwable)new MessagingException((InternalEvent)event, (Throwable)new LifecycleException(CoreMessages.isStopped(this.getName()), (Object)event.getMessage())));
            }
        };
    }

    @Override
    protected void doStop() throws MuleException {
        try {
            this.stopIfStoppable(this.source);
        }
        finally {
            this.canProcessMessage = false;
        }
        this.disposeIfDisposable(this.sink);
        this.sink = null;
        this.stopIfStoppable(this.processingStrategy);
        this.stopIfStoppable(this.pipeline);
        super.doStop();
    }

    @Override
    protected void doDispose() {
        this.disposeIfDisposable(this.pipeline);
        this.disposeIfDisposable(this.source);
        super.doDispose();
    }

    protected Sink getSink() {
        return this.sink;
    }

    @Override
    public Map<String, InternalEventContext> getSerializationEventContextCache() {
        return this.eventContextCache.asMap();
    }

    @Override
    public int getMaxConcurrency() {
        return this.maxConcurrency;
    }

    private class ProcessorStartCompleteProcessor
    implements Processor,
    InternalMessageProcessor {
        private ProcessorStartCompleteProcessor() {
        }

        @Override
        public InternalEvent process(InternalEvent event) throws MuleException {
            AbstractPipeline.this.getMuleContext().getRegistry().lookupObject(NotificationDispatcher.class).dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(event, null, AbstractPipeline.this), AbstractPipeline.this, 1801));
            long startTime = System.currentTimeMillis();
            Mono.from(event.getContext().getBeforeResponsePublisher()).doOnSuccess(result -> this.fireCompleteNotification((InternalEvent)result, null)).doOnError(MessagingException.class, messagingException -> this.fireCompleteNotification(null, (MessagingException)messagingException)).doOnError(throwable -> !(throwable instanceof MessagingException), throwable -> this.fireCompleteNotification(null, new MessagingException(event, (Throwable)throwable, (Processor)(this instanceof Processor ? this : null)))).doOnTerminate((result, throwable) -> event.getContext().getProcessingTime().ifPresent(time -> time.addFlowExecutionBranchTime(startTime))).subscribe(Operators.requestUnbounded());
            return event;
        }

        private void fireCompleteNotification(InternalEvent event, MessagingException messagingException) {
            try {
                AbstractPipeline.this.getMuleContext().getRegistry().lookupObject(NotificationDispatcher.class).dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(event, messagingException, AbstractPipeline.this), AbstractPipeline.this, 1804));
            }
            catch (RegistrationException e) {
                throw new MuleRuntimeException(e);
            }
        }
    }

    private class ProcessEndProcessor
    extends AbstractAnnotatedObject
    implements Processor,
    InternalMessageProcessor {
        private ProcessEndProcessor() {
        }

        @Override
        public InternalEvent process(InternalEvent event) throws MuleException {
            AbstractPipeline.this.getMuleContext().getRegistry().lookupObject(NotificationDispatcher.class).dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(event, null, AbstractPipeline.this), AbstractPipeline.this, 1802));
            return event;
        }
    }
}

