/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.source.scheduler;

import java.util.concurrent.ScheduledFuture;
import java.util.function.Function;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.CreateException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.notification.ConnectorMessageNotification;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.source.SchedulerConfiguration;
import org.mule.runtime.api.source.SchedulerMessageSource;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.context.MuleContextAware;
import org.mule.runtime.core.api.context.notification.NotificationHelper;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextFactory;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.source.scheduler.PeriodicScheduler;
import org.mule.runtime.core.api.util.ClassUtils;
import org.mule.runtime.core.internal.component.ComponentUtils;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class DefaultSchedulerMessageSource
extends AbstractComponent
implements MessageSource,
SchedulerMessageSource,
MuleContextAware,
Initialisable,
Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSchedulerMessageSource.class);
    private final PeriodicScheduler scheduler;
    private final NotificationHelper notificationHelper;
    private final boolean disallowConcurrentExecution;
    private Scheduler pollingExecutor;
    private ScheduledFuture<?> schedulingJob;
    private Processor listener;
    private FlowConstruct flowConstruct;
    private MuleContext muleContext;
    private boolean started;
    private volatile boolean executing = false;

    public DefaultSchedulerMessageSource(MuleContext muleContext, PeriodicScheduler scheduler, boolean disallowConcurrentExecution) {
        this.muleContext = muleContext;
        this.scheduler = scheduler;
        this.disallowConcurrentExecution = disallowConcurrentExecution;
        this.notificationHelper = new NotificationHelper(muleContext.getNotificationManager(), ConnectorMessageNotification.class, false);
    }

    @Override
    public synchronized void start() throws MuleException {
        if (this.started) {
            return;
        }
        try {
            this.schedulingJob = ClassUtils.withContextClassLoader(this.muleContext.getExecutionClassLoader(), () -> this.scheduler.schedule(this.pollingExecutor, () -> this.run()));
            this.started = true;
        }
        catch (Exception ex) {
            this.stop();
            throw new CreateException(CoreMessages.failedToScheduleWork(), ex, this);
        }
    }

    @Override
    public synchronized void stop() throws MuleException {
        if (!this.started) {
            return;
        }
        if (this.schedulingJob != null) {
            this.schedulingJob.cancel(false);
            this.schedulingJob = null;
        }
        this.started = false;
    }

    @Override
    public void trigger() {
        this.pollingExecutor.execute(() -> ClassUtils.withContextClassLoader(this.muleContext.getExecutionClassLoader(), () -> this.poll()));
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    @Override
    public SchedulerConfiguration getConfiguration() {
        return this.scheduler;
    }

    private final void run() {
        PrivilegedEvent.setCurrentEvent(null);
        if (this.muleContext.isPrimaryPollingInstance()) {
            this.poll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void poll() {
        boolean execute = false;
        DefaultSchedulerMessageSource defaultSchedulerMessageSource = this;
        synchronized (defaultSchedulerMessageSource) {
            if (this.disallowConcurrentExecution && this.executing) {
                execute = false;
            } else {
                execute = true;
                this.executing = true;
            }
        }
        if (execute) {
            Message request = Message.of(null);
            this.pollWith(request);
        } else {
            LOGGER.info("Flow '{}' is already running and 'disallowConcurrentExecution' is set to 'true'. Execution skipped.", (Object)this.flowConstruct.getRootContainerLocation().getGlobalName());
        }
    }

    private void pollWith(Message request) {
        try {
            Mono.just((Object)request).map(message -> InternalEvent.builder(EventContextFactory.create(this.flowConstruct, this.getLocation())).message(request).build()).doOnNext(event -> PrivilegedEvent.setCurrentEvent(event)).doOnNext(event -> this.notificationHelper.fireNotification(this, (CoreEvent)event, this.getLocation(), 801)).cast(CoreEvent.class).transform((Function)this.listener).doOnError(MessagingException.class, me -> ((BaseEventContext)me.getEvent().getContext()).error((Throwable)me)).doOnSuccess(result -> ((BaseEventContext)result.getContext()).success()).doFinally(s -> {
                DefaultSchedulerMessageSource defaultSchedulerMessageSource = this;
                synchronized (defaultSchedulerMessageSource) {
                    this.executing = false;
                }
            }).subscribe(Operators.requestUnbounded());
        }
        catch (Exception e) {
            this.muleContext.getExceptionListener().handleException(e);
        }
    }

    @Override
    public void initialise() throws InitialisationException {
        this.flowConstruct = ComponentUtils.getFromAnnotatedObjectOrFail(this.muleContext.getConfigurationComponentLocator(), this);
        this.createScheduler();
    }

    @Override
    public void dispose() {
        this.disposeScheduler();
    }

    private void createScheduler() throws InitialisationException {
        this.pollingExecutor = this.muleContext.getSchedulerService().cpuLightScheduler();
    }

    private void disposeScheduler() {
        if (this.pollingExecutor != null) {
            this.pollingExecutor.stop();
            this.pollingExecutor = null;
        }
    }

    @Override
    public void setMuleContext(MuleContext context) {
        this.muleContext = context;
    }

    @Override
    public void setListener(Processor listener) {
        this.listener = listener;
    }
}

