/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.construct;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.connector.ReplyToHandler;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.processor.MessageProcessorChainBuilder;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.interceptor.ProcessingTimeInterceptor;
import org.mule.runtime.core.internal.construct.AbstractPipeline;
import org.mule.runtime.core.internal.construct.processor.FlowConstructStatisticsMessageProcessor;
import org.mule.runtime.core.processor.strategy.TransactionAwareWorkQueueProcessingStrategyFactory;
import org.mule.runtime.core.routing.requestreply.AsyncReplyToPropertyRequestReplyReplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultFlowBuilder
implements Flow.Builder {
    private final String name;
    private final MuleContext muleContext;
    private MessageSource source;
    private List<Processor> processors = Collections.emptyList();
    private MessagingExceptionHandler exceptionListener;
    private ProcessingStrategyFactory processingStrategyFactory;
    private String initialState = "started";
    private int maxConcurrency = Integer.MAX_VALUE;
    private DefaultFlow flow;

    public DefaultFlowBuilder(String name, MuleContext muleContext) {
        Preconditions.checkArgument(StringUtils.isNotEmpty((CharSequence)name), "name cannot be empty");
        Preconditions.checkArgument(muleContext != null, "muleContext cannot be null");
        this.name = name;
        this.muleContext = muleContext;
    }

    @Override
    public Flow.Builder source(MessageSource source) {
        this.checkImmutable();
        Preconditions.checkArgument(source != null, "source cannot be null");
        this.source = source;
        return this;
    }

    @Override
    public Flow.Builder processors(List<Processor> processors) {
        this.checkImmutable();
        Preconditions.checkArgument(processors != null, "processors cannot be null");
        this.processors = processors;
        return this;
    }

    @Override
    public Flow.Builder processors(Processor ... processors) {
        this.checkImmutable();
        this.processors = Arrays.asList(processors);
        return this;
    }

    @Override
    public Flow.Builder messagingExceptionHandler(MessagingExceptionHandler exceptionListener) {
        this.checkImmutable();
        this.exceptionListener = exceptionListener;
        return this;
    }

    @Override
    public Flow.Builder processingStrategyFactory(ProcessingStrategyFactory processingStrategyFactory) {
        this.checkImmutable();
        Preconditions.checkArgument(processingStrategyFactory != null, "processingStrategyFactory cannot be null");
        this.processingStrategyFactory = processingStrategyFactory;
        return this;
    }

    @Override
    public Flow.Builder initialState(String initialState) {
        this.checkImmutable();
        Preconditions.checkArgument(initialState != null, "initialState cannot be null");
        this.initialState = initialState;
        return this;
    }

    @Override
    public Flow.Builder maxConcurrency(int maxConcurrency) {
        this.checkImmutable();
        Preconditions.checkArgument(maxConcurrency > 0, "maxConcurrency cannot be less than 1");
        this.maxConcurrency = maxConcurrency;
        return this;
    }

    @Override
    public Flow build() {
        this.checkImmutable();
        this.flow = new DefaultFlow(this.name, this.muleContext, this.source, this.processors, Optional.ofNullable(this.exceptionListener), Optional.ofNullable(this.processingStrategyFactory), this.initialState, this.maxConcurrency);
        return this.flow;
    }

    protected final void checkImmutable() {
        if (this.flow != null) {
            throw new IllegalStateException("Cannot change attributes once the flow was built");
        }
    }

    public static class DefaultFlow
    extends AbstractPipeline
    implements Flow {
        protected DefaultFlow(String name, MuleContext muleContext, MessageSource source, List<Processor> processors, Optional<MessagingExceptionHandler> exceptionListener, Optional<ProcessingStrategyFactory> processingStrategyFactory, String initialState, int maxConcurrency) {
            super(name, muleContext, source, processors, exceptionListener, processingStrategyFactory, initialState, maxConcurrency);
        }

        @Override
        public Event process(Event event) throws MuleException {
            return MessageProcessors.processToApply(event, this);
        }

        @Override
        public Publisher<Event> apply(Publisher<Event> publisher) {
            return Flux.from(publisher).doOnNext(this.assertStarted()).flatMap(event -> {
                Event request = this.createMuleEventForCurrentFlow((Event)event, event.getReplyToDestination(), event.getReplyToHandler());
                try {
                    this.getSink().accept(request);
                }
                catch (RejectedExecutionException ree) {
                    request.getContext().error(ExceptionUtils.updateMessagingExceptionWithError(new MessagingException((Event)event, ree, (Processor)this), this, this));
                }
                return Mono.from(request.getContext().getResponsePublisher()).map(r -> {
                    Event result = this.createReturnEventForParentFlowConstruct((Event)r, (Event)event);
                    return result;
                }).onErrorMap(MessagingException.class, me -> {
                    me.setProcessedEvent(this.createReturnEventForParentFlowConstruct(me.getEvent(), (Event)event));
                    return me;
                });
            });
        }

        private Event createMuleEventForCurrentFlow(Event event, Object replyToDestination, ReplyToHandler replyToHandler) {
            replyToHandler = null;
            event = Event.builder(event).flow(this).replyToHandler(replyToHandler).replyToDestination(replyToDestination).build();
            this.resetRequestContextEvent(event);
            return event;
        }

        private Event createReturnEventForParentFlowConstruct(Event result, Event original) {
            if (result != null) {
                Optional<Error> errorOptional = result.getError();
                result = Event.builder(result).flow(original.getFlowConstruct()).replyToHandler(original.getReplyToHandler()).replyToDestination(original.getReplyToDestination()).error(errorOptional.orElse(null)).build();
            }
            this.resetRequestContextEvent(result);
            return result;
        }

        private void resetRequestContextEvent(Event event) {
            Event.setCurrentEvent(event);
        }

        @Override
        protected void configurePreProcessors(MessageProcessorChainBuilder builder) throws MuleException {
            super.configurePreProcessors(builder);
            builder.chain(new ProcessingTimeInterceptor());
            builder.chain(new FlowConstructStatisticsMessageProcessor());
        }

        @Override
        protected void configurePostProcessors(MessageProcessorChainBuilder builder) throws MuleException {
            builder.chain(new AsyncReplyToPropertyRequestReplyReplier());
            super.configurePostProcessors(builder);
        }

        @Override
        protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
            return new TransactionAwareWorkQueueProcessingStrategyFactory();
        }

        @Override
        public String getConstructType() {
            return "Flow";
        }

        @Override
        public boolean isSynchronous() {
            return this.getProcessingStrategy() != null ? this.getProcessingStrategy().isSynchronous() : true;
        }
    }
}

