/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.core.DefaultEventContext;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.context.notification.EnrichedNotificationInfo;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.exception.MessagingExceptionHandlerAware;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.context.notification.AsyncMessageNotification;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.internal.util.ProcessingStrategyUtils;
import org.mule.runtime.core.processor.AbstractMessageProcessorOwner;
import org.mule.runtime.core.session.DefaultMuleSession;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class AsyncDelegateMessageProcessor
extends AbstractMessageProcessorOwner
implements Processor,
Initialisable,
Startable,
Stoppable,
MessagingExceptionHandlerAware {
    @Inject
    private SchedulerService schedulerService;
    protected Logger logger = LoggerFactory.getLogger(this.getClass());
    protected MessageProcessorChain delegate;
    private org.mule.runtime.api.scheduler.Scheduler scheduler;
    private Scheduler reactorScheduler;
    protected String name;
    private MessagingExceptionHandler messagingExceptionHandler;

    public AsyncDelegateMessageProcessor(MessageProcessorChain delegate) {
        this.delegate = delegate;
    }

    public AsyncDelegateMessageProcessor(MessageProcessorChain delegate, String name) {
        this.delegate = delegate;
        this.name = name;
    }

    @Override
    public void initialise() throws InitialisationException {
        if (this.delegate == null) {
            throw new InitialisationException(CoreMessages.objectIsNull("delegate message processor"), (Initialisable)this);
        }
        super.initialise();
    }

    @Override
    public void start() throws MuleException {
        this.scheduler = this.schedulerService.ioScheduler(this.getLocation() != null ? this.muleContext.getSchedulerBaseConfig().withName(this.getLocation().getLocation()) : this.muleContext.getSchedulerBaseConfig());
        this.reactorScheduler = Schedulers.fromExecutorService((ExecutorService)this.scheduler);
        super.start();
    }

    @Override
    public void stop() throws MuleException {
        super.stop();
        if (this.scheduler != null) {
            this.scheduler.stop();
            this.scheduler = null;
        }
        if (this.reactorScheduler != null) {
            this.reactorScheduler.dispose();
            this.reactorScheduler = null;
        }
    }

    @Override
    public Event process(Event event) throws MuleException {
        return MessageProcessors.processToApply(event, this);
    }

    @Override
    public Publisher<Event> apply(Publisher<Event> publisher) {
        return Flux.from(publisher).doOnNext(request -> Flux.just((Object)request).map(event -> this.asyncEvent((Event)request)).transform(innerPublisher -> Flux.from((Publisher)innerPublisher).doOnNext(this.fireAsyncScheduledNotification(this.flowConstruct)).doOnNext(asyncRequest -> Flux.just((Object)asyncRequest).transform((Function)this.scheduleAsync(this.delegate)).doOnNext(event -> this.fireAsyncCompleteNotification((Event)event, this.flowConstruct, null)).doOnError(MessagingException.class, e -> this.fireAsyncCompleteNotification(e.getEvent(), this.flowConstruct, (MessagingException)e)).onErrorResume(MessagingException.class, (Function)this.messagingExceptionHandler).doOnError(Exceptions.UNEXPECTED_EXCEPTION_PREDICATE, exception -> this.logger.error("Unhandled exception in async processing.", exception)).doOnNext(event -> asyncRequest.getContext().success((Event)event)).doOnError(throwable -> asyncRequest.getContext().error((Throwable)throwable)).subscribe())).onErrorResume(MessagingException.class, (Function)this.messagingExceptionHandler).doOnError(Exceptions.UNEXPECTED_EXCEPTION_PREDICATE, exception -> this.logger.error("Unhandled exception in async processing.", exception)).subscribe());
    }

    private ReactiveProcessor scheduleAsync(Processor delegate) {
        if (!ProcessingStrategyUtils.isSynchronousProcessing(this.flowConstruct) && this.flowConstruct instanceof Pipeline) {
            return publisher -> Flux.from((Publisher)publisher).transform((Function)((Pipeline)this.flowConstruct).getProcessingStrategy().onPipeline(delegate));
        }
        return publisher -> Flux.from((Publisher)publisher).transform((Function)delegate).subscribeOn(this.reactorScheduler);
    }

    private Event asyncEvent(Event event) {
        return Event.builder(DefaultEventContext.child(event.getContext()), event).replyToHandler(null).session(new DefaultMuleSession(event.getSession())).build();
    }

    private Consumer<Event> fireAsyncScheduledNotification(FlowConstruct flowConstruct) {
        return event -> this.muleContext.getNotificationManager().fireNotification(new AsyncMessageNotification(EnrichedNotificationInfo.createInfo(event, null, this), flowConstruct, 1901));
    }

    private void fireAsyncCompleteNotification(Event event, FlowConstruct flowConstruct, MessagingException exception) {
        this.muleContext.getNotificationManager().fireNotification(new AsyncMessageNotification(EnrichedNotificationInfo.createInfo(event, exception, this), flowConstruct, 1902));
    }

    @Override
    protected List<Processor> getOwnedMessageProcessors() {
        return Collections.singletonList(this.delegate);
    }

    @Override
    public void setMessagingExceptionHandler(MessagingExceptionHandler messagingExceptionHandler) {
        this.messagingExceptionHandler = messagingExceptionHandler;
        this.delegate.setMessagingExceptionHandler(messagingExceptionHandler);
    }
}

