/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.construct;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.mule.runtime.api.component.execution.ComponentExecutionException;
import org.mule.runtime.api.event.Event;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.meta.AnnotatedObject;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.connector.ReplyToHandler;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.exception.AbstractExceptionListener;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.management.stats.FlowConstructStatistics;
import org.mule.runtime.core.api.processor.MessageProcessorChainBuilder;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.AsyncProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.MessagingExceptionResolver;
import org.mule.runtime.core.internal.construct.AbstractFlowConstruct;
import org.mule.runtime.core.internal.construct.AbstractPipeline;
import org.mule.runtime.core.internal.construct.processor.FlowConstructStatisticsMessageProcessor;
import org.mule.runtime.core.internal.processor.strategy.TransactionAwareWorkQueueProcessingStrategyFactory;
import org.mule.runtime.core.internal.routing.requestreply.SimpleAsyncRequestReplyRequester;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultFlowBuilder
implements Flow.Builder {
    private final String name;
    private final MuleContext muleContext;
    private MessageSource source;
    private List<Processor> processors = Collections.emptyList();
    private MessagingExceptionHandler exceptionListener;
    private ProcessingStrategyFactory processingStrategyFactory;
    private String initialState = "started";
    private int maxConcurrency = AsyncProcessingStrategyFactory.DEFAULT_MAX_CONCURRENCY;
    private DefaultFlow flow;

    public DefaultFlowBuilder(String name, MuleContext muleContext) {
        Preconditions.checkArgument(StringUtils.isNotEmpty((CharSequence)name), "name cannot be empty");
        Preconditions.checkArgument(muleContext != null, "muleContext cannot be null");
        this.name = name;
        this.muleContext = muleContext;
    }

    @Override
    public Flow.Builder source(MessageSource source) {
        this.checkImmutable();
        Preconditions.checkArgument(source != null, "source cannot be null");
        this.source = source;
        return this;
    }

    @Override
    public Flow.Builder processors(List<Processor> processors) {
        this.checkImmutable();
        Preconditions.checkArgument(processors != null, "processors cannot be null");
        this.processors = processors;
        return this;
    }

    @Override
    public Flow.Builder processors(Processor ... processors) {
        this.checkImmutable();
        this.processors = Arrays.asList(processors);
        return this;
    }

    @Override
    public Flow.Builder messagingExceptionHandler(MessagingExceptionHandler exceptionListener) {
        this.checkImmutable();
        this.exceptionListener = exceptionListener;
        return this;
    }

    @Override
    public Flow.Builder processingStrategyFactory(ProcessingStrategyFactory processingStrategyFactory) {
        this.checkImmutable();
        Preconditions.checkArgument(processingStrategyFactory != null, "processingStrategyFactory cannot be null");
        this.processingStrategyFactory = processingStrategyFactory;
        return this;
    }

    @Override
    public Flow.Builder initialState(String initialState) {
        this.checkImmutable();
        Preconditions.checkArgument(initialState != null, "initialState cannot be null");
        this.initialState = initialState;
        return this;
    }

    @Override
    public Flow.Builder maxConcurrency(int maxConcurrency) {
        this.checkImmutable();
        Preconditions.checkArgument(maxConcurrency > 0, "maxConcurrency cannot be less than 1");
        this.maxConcurrency = maxConcurrency;
        return this;
    }

    @Override
    public Flow build() {
        this.checkImmutable();
        FlowConstructStatistics flowStatistics = AbstractFlowConstruct.createFlowStatistics(this.name, this.muleContext);
        if (this.exceptionListener instanceof AbstractExceptionListener) {
            ((AbstractExceptionListener)((Object)this.exceptionListener)).setStatistics(flowStatistics);
        }
        this.flow = new DefaultFlow(this.name, this.muleContext, this.source, this.processors, Optional.ofNullable(this.exceptionListener), Optional.ofNullable(this.processingStrategyFactory), this.initialState, this.maxConcurrency, flowStatistics);
        return this.flow;
    }

    protected final void checkImmutable() {
        if (this.flow != null) {
            throw new IllegalStateException("Cannot change attributes once the flow was built");
        }
    }

    public static class DefaultFlow
    extends AbstractPipeline
    implements Flow {
        private final MessagingExceptionResolver exceptionResolver = new MessagingExceptionResolver(this);

        protected DefaultFlow(String name, MuleContext muleContext, MessageSource source, List<Processor> processors, Optional<MessagingExceptionHandler> exceptionListener, Optional<ProcessingStrategyFactory> processingStrategyFactory, String initialState, int maxConcurrency, FlowConstructStatistics flowConstructStatistics) {
            super(name, muleContext, source, processors, exceptionListener, processingStrategyFactory, initialState, maxConcurrency, flowConstructStatistics);
        }

        @Override
        public InternalEvent process(InternalEvent event) throws MuleException {
            return MessageProcessors.processToApply(event, this);
        }

        @Override
        public Publisher<InternalEvent> apply(Publisher<InternalEvent> publisher) {
            return Flux.from(publisher).doOnNext(this.assertStarted()).flatMap(event -> {
                InternalEvent request = this.createMuleEventForCurrentFlow((InternalEvent)event, event.getReplyToDestination(), event.getReplyToHandler());
                try {
                    this.getSink().accept(request);
                }
                catch (RejectedExecutionException ree) {
                    MessagingException me2 = new MessagingException((InternalEvent)event, ree, (AnnotatedObject)this);
                    request.getContext().error(this.exceptionResolver.resolve(me2, this.getMuleContext()));
                }
                return Mono.from(request.getContext().getResponsePublisher()).map(r -> {
                    InternalEvent result = this.createReturnEventForParentFlowConstruct((InternalEvent)r, (InternalEvent)event);
                    return result;
                }).onErrorMap(MessagingException.class, me -> {
                    me.setProcessedEvent(this.createReturnEventForParentFlowConstruct(me.getEvent(), (InternalEvent)event));
                    return me;
                });
            });
        }

        @Override
        protected CompletableFuture<Event> executeEvent(InternalEvent event) {
            return Mono.just((Object)event).transform((Function)this).onErrorResume(throwable -> {
                MessagingException messagingException = (MessagingException)throwable;
                return Mono.from(this.getExceptionListener().apply(messagingException));
            }).onErrorMap(throwable -> {
                MessagingException messagingException = (MessagingException)throwable;
                InternalEvent resultEvent = messagingException.getEvent();
                return new ComponentExecutionException(resultEvent.getError().get().getCause(), resultEvent);
            }).map(resultEvent -> resultEvent).toFuture();
        }

        private InternalEvent createMuleEventForCurrentFlow(InternalEvent event, Object replyToDestination, ReplyToHandler replyToHandler) {
            replyToHandler = null;
            event = InternalEvent.builder(event).flow(this).replyToHandler(replyToHandler).replyToDestination(replyToDestination).build();
            this.resetRequestContextEvent(event);
            return event;
        }

        private InternalEvent createReturnEventForParentFlowConstruct(InternalEvent result, InternalEvent original) {
            if (result != null) {
                Optional<Error> errorOptional = result.getError();
                result = InternalEvent.builder(result).flow(original.getFlowConstruct()).replyToHandler(original.getReplyToHandler()).replyToDestination(original.getReplyToDestination()).error(errorOptional.orElse(null)).build();
            }
            this.resetRequestContextEvent(result);
            return result;
        }

        private void resetRequestContextEvent(InternalEvent event) {
            InternalEvent.setCurrentEvent(event);
        }

        @Override
        protected void configurePreProcessors(MessageProcessorChainBuilder builder) throws MuleException {
            super.configurePreProcessors(builder);
            builder.chain(new FlowConstructStatisticsMessageProcessor(this.getStatistics()));
        }

        @Override
        protected void configurePostProcessors(MessageProcessorChainBuilder builder) throws MuleException {
            builder.chain(new SimpleAsyncRequestReplyRequester.AsyncReplyToPropertyRequestReplyReplier(this.getSource()));
            super.configurePostProcessors(builder);
        }

        @Override
        protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
            return new TransactionAwareWorkQueueProcessingStrategyFactory();
        }

        @Override
        public String getConstructType() {
            return "Flow";
        }

        @Override
        public boolean isSynchronous() {
            return this.getProcessingStrategy() != null ? this.getProcessingStrategy().isSynchronous() : true;
        }
    }
}

