/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mule.runtime.module.batch.internal.engine;

import com.google.common.collect.ImmutableList;
import com.mulesoft.mule.runtime.module.batch.BatchProcessingListener;
import com.mulesoft.mule.runtime.module.batch.api.BatchJob;
import com.mulesoft.mule.runtime.module.batch.api.BatchJobInstance;
import com.mulesoft.mule.runtime.module.batch.api.BatchJobInstanceStatus;
import com.mulesoft.mule.runtime.module.batch.api.BatchJobResult;
import com.mulesoft.mule.runtime.module.batch.api.BatchStep;
import com.mulesoft.mule.runtime.module.batch.api.notification.BatchNotification;
import com.mulesoft.mule.runtime.module.batch.api.record.Record;
import com.mulesoft.mule.runtime.module.batch.engine.BatchEngine;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceStore;
import com.mulesoft.mule.runtime.module.batch.engine.BatchRecordDispatcher;
import com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueManager;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContextFactory;
import com.mulesoft.mule.runtime.module.batch.exception.BatchException;
import com.mulesoft.mule.runtime.module.batch.internal.BaseBatchProcessingListenerOwner;
import com.mulesoft.mule.runtime.module.batch.internal.BatchJobResultAdapter;
import com.mulesoft.mule.runtime.module.batch.internal.DefaultBatchJobInstance;
import com.mulesoft.mule.runtime.module.batch.internal.ImmutableBatchJobResult;
import com.mulesoft.mule.runtime.module.batch.internal.engine.BatchLockFactory;
import com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate;
import com.mulesoft.mule.runtime.module.batch.internal.engine.DefaultBatchRecordDispatcher;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.RecordBuffer;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.SteppingQueueBuffer;
import com.mulesoft.mule.runtime.module.batch.internal.engine.history.DefaultHistoryExpirationMonitor;
import com.mulesoft.mule.runtime.module.batch.internal.engine.history.HistoryExpirationMonitor;
import com.mulesoft.mule.runtime.module.batch.internal.engine.queue.BatchQueueLoader;
import com.mulesoft.mule.runtime.module.batch.internal.engine.threading.BatchWorkManager;
import com.mulesoft.mule.runtime.module.batch.internal.engine.transaction.DefaultBatchTransactionContextFactory;
import com.mulesoft.mule.runtime.module.batch.internal.reporting.ExceptionsInTextBatchResultReporter;
import com.mulesoft.mule.runtime.module.batch.util.BatchUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import javax.inject.Inject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.ConfigurationProperties;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.exception.ExceptionHelper;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.Notification;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.api.notification.NotificationListener;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.streaming.HasSize;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.context.MuleContextAware;
import org.mule.runtime.core.api.context.notification.MuleContextNotification;
import org.mule.runtime.core.api.context.notification.MuleContextNotificationListener;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.internal.component.ComponentUtils;
import org.mule.runtime.core.internal.routing.EventToMessageSequenceSplittingStrategy;
import org.mule.runtime.core.internal.routing.ExpressionSplittingStrategy;
import org.mule.runtime.core.internal.routing.MessageSequence;
import org.mule.runtime.core.internal.routing.SplittingStrategy;
import org.mule.runtime.core.internal.util.ConcurrencyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBatchEngine
extends BaseBatchProcessingListenerOwner
implements BatchEngine,
MuleContextAware,
Initialisable,
Stoppable,
Startable {
    @Inject
    private ConfigurationProperties configurationProperties;
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBatchEngine.class);
    private static final String MULE_BATCH_HISTORY_EXPIRATION_FREQUENCY = "mule.batch.historyExpirationFrequency";
    private static final int UNLIMITED = -1;
    private MuleContextNotificationListener<MuleContextNotification> contextStartListener;
    private SplittingStrategy<CoreEvent, MessageSequence<?>> splittingStrategy;
    private final Map<String, BatchJobAdapter> jobs = new HashMap<String, BatchJobAdapter>();
    @Inject
    private BatchWorkManager workManager;
    @Inject
    private BatchQueueManager batchQueueManager;
    @Inject
    private BatchJobInstanceStore jobInstanceStore;
    @Inject
    private BatchLockFactory lockFactory;
    @Inject
    private SchedulerService schedulerService;
    @Inject
    private ExpressionManager expressionManager;
    @Inject
    private NotificationDispatcher notificationDispatcher;
    @Inject
    private NotificationListenerRegistry notificationListenerRegistry;
    @Inject
    private ConfigurationComponentLocator componentLocator;
    private BatchTransactionContextFactory batchTransactionContextFactory;
    private BatchRecordDispatcher recordDispatcher;
    private HistoryExpirationMonitor historyExpirationMonitor;
    private long historyExpirationFrequency = Long.parseLong(System.getProperty("mule.batch.historyExpirationFrequency", String.valueOf(TimeUnit.HOURS.toMillis(1L))));
    private MuleContext muleContext;
    private RecordBuffer queueBuffer;
    private final AtomicBoolean contextStarted = new AtomicBoolean(false);
    private final AtomicBoolean engineInitialized = new AtomicBoolean(false);

    public void initialise() throws InitialisationException {
        this.splittingStrategy = new EventToMessageSequenceSplittingStrategy(new ExpressionSplittingStrategy(this.expressionManager));
        this.contextStartListener = new MuleContextNotificationListener<MuleContextNotification>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onNotification(MuleContextNotification notification) {
                if (notification.getAction().getActionId() == 104) {
                    DefaultBatchEngine.this.notificationListenerRegistry.unregisterListener((NotificationListener)this);
                    DefaultBatchEngine.this.contextStartListener = null;
                    Map map = DefaultBatchEngine.this.jobs;
                    synchronized (map) {
                        DefaultBatchEngine.this.contextStarted.set(true);
                        if (DefaultBatchEngine.this.areJobsRegistered()) {
                            DefaultBatchEngine.this.initializeEngine();
                        }
                    }
                }
            }
        };
        this.notificationListenerRegistry.registerListener(this.contextStartListener);
    }

    private void initializeEngine() {
        this.batchTransactionContextFactory = new DefaultBatchTransactionContextFactory((BatchEngine)this);
        this.queueBuffer = new SteppingQueueBuffer((BatchEngine)this);
        this.recordDispatcher = this.createRecordDispatcher();
        this.recordDispatcher.start();
        this.batchQueueManager.addBatchProcessingListener(this.recordDispatcher.getListener());
        this.jobInstanceStore.addBatchProcessingListener(this.recordDispatcher.getListener());
        this.createAndStartHistoryExpirationMonitor();
        this.engineInitialized.set(true);
    }

    public void stop() throws MuleException {
        if (this.recordDispatcher != null) {
            this.recordDispatcher.stop();
        }
        if (this.batchTransactionContextFactory != null) {
            this.batchTransactionContextFactory.closeAndRollback();
        }
        if (this.historyExpirationMonitor != null) {
            this.historyExpirationMonitor.stopMonitoring();
        }
        if (this.batchQueueManager != null) {
            this.batchQueueManager.removeBatchProcessingListener(this.recordDispatcher.getListener());
        }
        if (this.jobInstanceStore != null) {
            this.jobInstanceStore.removeBatchProcessingListener(this.recordDispatcher.getListener());
        }
        this.contextStarted.set(false);
        this.engineInitialized.set(false);
    }

    private BatchJobInstanceAdapter flushSteppingQueueBuffer(BatchJobInstanceAdapter jobInstance) {
        this.queueBuffer.flush(jobInstance);
        return this.refresh(jobInstance);
    }

    public BatchJobInstanceAdapter createNewJobInstance(BatchJobAdapter job, CoreEvent event) throws MuleException {
        String jobInstanceId = job.generateJobInstanceId(event);
        DefaultBatchJobInstance jobInstance = new DefaultBatchJobInstance(jobInstanceId, job.getName(), event);
        jobInstance.setStatus(BatchJobInstanceStatus.LOADING);
        jobInstance.setQueueName(this.batchQueueManager.steppingQueue((BatchJobInstanceAdapter)jobInstance).getQueueName());
        try {
            this.jobInstanceStore.store((BatchJobInstanceAdapter)jobInstance);
        }
        catch (IllegalStateException e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Batch Job '%s' already has an instance with id '%s'. Please upgrade the %s expression to produce unique values", job.getName(), jobInstanceId, "job-instance-id")), (Throwable)e);
        }
        LOGGER.info("Created instance '{}' for batch job '{}'", (Object)jobInstance.getId(), (Object)job.getName());
        return jobInstance;
    }

    public BatchTransactionContext createTransactionContext(BatchJobInstanceAdapter jobInstance) {
        return this.batchTransactionContextFactory.createTransactionContext(jobInstance);
    }

    public BatchJobInstanceAdapter load(BatchJobInstanceAdapter jobInstance, CoreEvent event) throws MuleException {
        BatchJobAdapter job = this.getJobFor((BatchJobInstance)jobInstance);
        jobInstance.setRecordCount(this.getRecordCountIfPossible(event));
        this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, BatchNotification.LOAD_PHASE_BEGIN));
        BatchQueueLoader loader = new BatchQueueLoader((BatchEngine)this, job.getBlockSize(), this.splittingStrategy, this.notificationDispatcher);
        try {
            jobInstance.setRecordCount(loader.splitAndLoad((BatchJob)job, jobInstance, event));
        }
        catch (Exception e) {
            BatchJobResult result = jobInstance.getResult();
            if (result instanceof BatchJobResultAdapter) {
                ((BatchJobResultAdapter)result).setLoadingPhaseException(e);
            }
            jobInstance.setStatus(BatchJobInstanceStatus.FAILED_LOADING);
            this.jobInstanceStore.update(jobInstance);
            this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, e, BatchNotification.LOAD_PHASE_FAILED));
            throw new BatchException((Throwable)e, (BatchJobInstance)jobInstance);
        }
        try {
            this.startExecution(jobInstance);
            this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, BatchNotification.LOAD_PHASE_END));
        }
        catch (Exception e) {
            try {
                this.releaseResources(jobInstance, true);
            }
            catch (Exception e2) {
                LOGGER.error(String.format("Exception found while trying to release resources of instance %s of job %s. Original exception will be re-thrown", jobInstance.getId(), job.getName()), (Throwable)e2);
            }
            this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, BatchNotification.LOAD_PHASE_FAILED));
            throw new BatchException((Throwable)e, (BatchJobInstance)jobInstance);
        }
        return jobInstance;
    }

    public void startExecution(BatchJobInstanceAdapter jobInstance) throws MuleException {
        jobInstance.setStatus(BatchJobInstanceStatus.EXECUTING);
        if (jobInstance.getRecordCount() > 0L) {
            BatchJobResult result = jobInstance.getResult();
            if (result instanceof BatchJobResultAdapter) {
                ((BatchJobResultAdapter)result).startClock();
            }
            this.workManager.executable((BatchJobInstance)jobInstance);
            this.jobInstanceStore.update(jobInstance);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("Started execution of instance '%s' of job '%s'", jobInstance.getId(), jobInstance.getOwnerJobName()));
            }
            this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, BatchNotification.JOB_PROCESS_RECORDS_BEGIN));
        } else {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("Instance '%s' of job '%s' has no records to process. It's execution will be finished now", jobInstance.getId(), jobInstance.getOwnerJobName()));
            }
            this.finishExecution(jobInstance, false);
        }
    }

    private BatchJobInstanceAdapter processJobInstanceStatus(BatchJobInstanceAdapter jobInstance) {
        BatchJobResult result = jobInstance.getResult();
        if (result.getFailedRecords() > 0L) {
            jobInstance.setStatus(BatchJobInstanceStatus.FAILED_PROCESS_RECORDS);
        } else {
            jobInstance.setStatus(BatchJobInstanceStatus.SUCCESSFUL);
        }
        this.stopClock(jobInstance);
        this.jobInstanceStore.update(jobInstance);
        return jobInstance;
    }

    public BatchJobInstanceAdapter finishExecution(BatchJobInstanceAdapter jobInstance, boolean shouldRefresh) throws MuleException {
        try {
            Pair<BatchJobInstanceAdapter, Boolean> result = this.processJobStatusBeforeOnComplete(jobInstance, shouldRefresh);
            jobInstance = (BatchJobInstanceAdapter)result.getLeft();
            if (((Boolean)result.getRight()).booleanValue()) {
                return jobInstance;
            }
            BatchJobInstanceAdapter finalJobInstance = jobInstance;
            this.workManager.scheduleManagementWork(() -> {
                try {
                    try {
                        this.stopAndAwait(finalJobInstance);
                        this.processOnComplete(finalJobInstance);
                    }
                    catch (MuleException e) {
                        LOGGER.error(String.format("Error invoking on-complete phase for instance '%s' or job '%s'", finalJobInstance.getId(), finalJobInstance.getOwnerJobName()), (Throwable)e);
                        this.finallyOnFinishJobInstance(finalJobInstance, true);
                    }
                }
                finally {
                    this.finallyOnFinishJobInstance(finalJobInstance, true);
                }
            });
            return jobInstance;
        }
        catch (RuntimeException e) {
            this.finallyOnFinishJobInstance(jobInstance, true);
            throw e;
        }
    }

    private void stopAndAwait(BatchJobInstanceAdapter jobInstance) {
        this.workManager.awaitStop(jobInstance);
    }

    private Pair<BatchJobInstanceAdapter, Boolean> processJobStatusBeforeOnComplete(BatchJobInstanceAdapter jobInstance, boolean shouldRefresh) {
        Lock lock = this.getLock(jobInstance);
        lock.lock();
        try {
            if (shouldRefresh) {
                jobInstance = this.refresh(jobInstance);
            }
            if (jobInstance.getStatus() != BatchJobInstanceStatus.EXECUTING) {
                ImmutablePair immutablePair = new ImmutablePair((Object)jobInstance, (Object)true);
                return immutablePair;
            }
            ImmutablePair immutablePair = new ImmutablePair((Object)this.processJobInstanceStatus(jobInstance), (Object)false);
            return immutablePair;
        }
        finally {
            lock.unlock();
        }
    }

    private void finallyOnFinishJobInstance(BatchJobInstanceAdapter jobInstance, boolean sendNotification) {
        try {
            this.jobInstanceStore.update(jobInstance);
            this.releaseResources(jobInstance, false);
        }
        finally {
            int notificationAction = this.getNotificationAction(jobInstance);
            this.logJobInstanceResult(jobInstance);
            if (notificationAction > 0 && sendNotification) {
                this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, notificationAction));
            }
        }
    }

    private int getNotificationAction(BatchJobInstanceAdapter jobInstance) {
        if (jobInstance.getResult().getFailedRecords() > 0L) {
            return BatchNotification.JOB_PROCESS_RECORDS_FAILED;
        }
        if (!jobInstance.getStatus().isFailure()) {
            return BatchNotification.JOB_SUCCESSFUL;
        }
        return 0;
    }

    private void processOnComplete(BatchJobInstanceAdapter jobInstance) throws MuleException {
        BatchJobResult result = jobInstance.getResult();
        this.doOnComplete(jobInstance);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("Finished execution for instance '%s' of job '%s'. Total Records processed: %d. Successful records: %d. Failed Records: %d", jobInstance.getId(), jobInstance.getOwnerJobName(), result.getProcessedRecords(), result.getSuccessfulRecords(), result.getFailedRecords()));
        }
        for (BatchProcessingListener listener : this.getListeners((BatchJobInstance)jobInstance)) {
            listener.onJobFinished(jobInstance);
        }
        this.logExceptionsSummary((BatchJob)this.getJobFor((BatchJobInstance)jobInstance), result);
    }

    private void logJobInstanceResult(BatchJobInstanceAdapter jobInstance) {
        BatchJobResult result = jobInstance.getResult();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("Finished execution for instance '%s' of job '%s'. Total Records processed: %d. Successful records: %d. Failed Records: %d", jobInstance.getId(), jobInstance.getOwnerJobName(), result.getProcessedRecords(), result.getSuccessfulRecords(), result.getFailedRecords()));
        }
    }

    public void releaseResources(BatchJobInstanceAdapter jobInstance, boolean includeFailedRecords) {
        if (includeFailedRecords) {
            this.jobInstanceStore.clearFailedRecords(jobInstance);
        }
        this.batchQueueManager.disposeQueues(this.createTransactionContext(jobInstance));
        this.queueBuffer.forget(jobInstance);
    }

    public CoreEvent.Builder createEventBuilder(Record record, BatchJobInstanceAdapter jobInstance) {
        return this.createEventBuilder(record, jobInstance, null);
    }

    public CoreEvent.Builder createEventBuilder(Record record, BatchJobInstanceAdapter jobInstance, CompletableFuture<Void> externalCompletion) {
        CoreEvent.Builder builder = jobInstance.getBatchEvent().asEventBuilder(this.getJobFor((BatchJobInstance)jobInstance), externalCompletion);
        Message.Builder messageBuilder = null;
        if (record != null) {
            TypedValue payload = record.getPayload();
            TypedValue attributes = record.getAttributes();
            if (payload != null) {
                messageBuilder = Message.builder().value(payload.getValue()).mediaType(payload.getDataType().getMediaType());
                if (attributes != null) {
                    messageBuilder.attributesValue(attributes.getValue()).attributesMediaType(attributes.getDataType().getMediaType());
                }
            }
        }
        builder.message(messageBuilder != null ? messageBuilder.build() : Message.of(null)).addVariable("batchJobInstanceId", (Object)jobInstance.getId());
        if (record != null) {
            builder.addVariable("_mule_batch_INTERNAL_record", (Object)record);
            record.getAllVariables().forEach((key, value) -> {
                CoreEvent.Builder builder2 = builder.addVariable(key, value.getValue(), value.getDataType());
            });
        }
        return builder;
    }

    private void routeError(BatchTransactionContext ctx, BatchStepAdapter step, Record record) throws MuleException {
        BatchJobAdapter job = ctx.getJob();
        BatchJobInstanceAdapter jobInstance = ctx.getJobInstance();
        if (job.getMaxFailedRecords() == -1 || jobInstance.getResult().getFailedRecords() <= (long)job.getMaxFailedRecords()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Failed record number %d is still acceptable for instance %s of job %s. Routing to next step", jobInstance.getResult().getFailedRecords(), jobInstance.getId(), jobInstance.getOwnerJobName()));
            }
            this.routeNext(ctx, step, record);
        } else {
            LOGGER.info(String.format("instance '%s' of job '%s' has reached the max allowed number of failed records. Record will be added to failed list and the instance will be removed from execution pool", jobInstance.getId(), jobInstance.getOwnerJobName()));
            BatchUtils.singleAckAndCommitIfNecessary((BatchTransactionContext)ctx);
            this.stopExecution(jobInstance, true);
        }
    }

    public void stopExecution(BatchJobInstanceAdapter jobInstanceAdapter, boolean markAsFailed) throws MuleException {
        ConcurrencyUtils.withLock((Lock)this.getLock(jobInstanceAdapter), () -> {
            int notification;
            BatchJobInstanceAdapter jobInstance = this.refresh(jobInstanceAdapter);
            if (jobInstance.getStatus() != BatchJobInstanceStatus.EXECUTING) {
                return;
            }
            this.stopClock(jobInstance);
            if (markAsFailed) {
                jobInstance.setStatus(BatchJobInstanceStatus.FAILED_PROCESS_RECORDS);
                notification = BatchNotification.JOB_PROCESS_RECORDS_FAILED;
            } else {
                jobInstance.setStatus(BatchJobInstanceStatus.STOPPED);
                notification = BatchNotification.JOB_STOPPED;
            }
            this.jobInstanceStore.update(jobInstance);
            for (BatchProcessingListener listener : this.getListeners((BatchJobInstance)jobInstance)) {
                listener.onJobStopped(jobInstance);
            }
            this.workManager.scheduleManagementWork(() -> {
                this.stopAndAwait(jobInstance);
                this.logExceptionsSummary((BatchJob)this.getJobFor((BatchJobInstance)jobInstance), jobInstance.getResult());
                this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, notification));
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info(String.format("instance %s of job %s has been stopped. Instance status is %s", jobInstance.getId(), jobInstance.getOwnerJobName(), jobInstance.getStatus()));
                }
                try {
                    this.doOnComplete(jobInstance);
                }
                catch (MuleException e) {
                    LOGGER.error(String.format("Error invoking on-complete phase for instance '%s' or job '%s'", jobInstance.getId(), jobInstance.getOwnerJobName()), (Throwable)e);
                }
            });
        });
    }

    public void resumeExecution(BatchJobInstanceAdapter jobInstance) throws MuleException {
        Lock lock = this.getLock(jobInstance);
        lock.lock();
        try {
            jobInstance = this.refresh(jobInstance);
        }
        finally {
            lock.unlock();
        }
        if (jobInstance.getStatus() != BatchJobInstanceStatus.STOPPED) {
            throw new IllegalStateException(String.format("Instance '%s' of job '%s' has been requested to resume but it's on '%s' state. Only instances in state '%s' can be resumed", jobInstance.getId(), jobInstance.getOwnerJobName(), jobInstance.getStatus(), BatchJobInstanceStatus.STOPPED));
        }
        jobInstance.setStatus(BatchJobInstanceStatus.EXECUTING);
        this.startClock(jobInstance);
        this.workManager.executable((BatchJobInstance)jobInstance);
        this.jobInstanceStore.update(jobInstance);
        for (BatchProcessingListener listener : this.getListeners((BatchJobInstance)jobInstance)) {
            listener.onExecutableStateTransition(jobInstance);
        }
        this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, BatchNotification.JOB_PROCESS_RECORDS_BEGIN));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("instance '%s' of job '%s' has been resumed", jobInstance.getId(), jobInstance.getOwnerJobName()));
        }
    }

    private void stopClock(BatchJobInstanceAdapter jobInstance) {
        BatchJobResult result = jobInstance.getResult();
        if (result instanceof BatchJobResultAdapter) {
            ((BatchJobResultAdapter)result).stopClock();
        }
    }

    private void startClock(BatchJobInstanceAdapter jobInstance) {
        BatchJobResult result = jobInstance.getResult();
        if (result instanceof BatchJobResultAdapter) {
            ((BatchJobResultAdapter)result).startClock();
        }
    }

    private void logExceptionsSummary(BatchJob job, BatchJobResult jobResult) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("\n" + new ExceptionsInTextBatchResultReporter().buildReport(job, jobResult));
        }
    }

    public void cancel(BatchJobInstanceAdapter jobInstance) throws MuleException {
        this.stopExecution(jobInstance, false);
        jobInstance.setStatus(BatchJobInstanceStatus.CANCELLED);
        this.releaseResources(jobInstance, true);
        this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, BatchNotification.JOB_CANCELLED));
        this.jobInstanceStore.update(jobInstance);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("instance %s of job %s has been cancelled.", jobInstance.getId(), jobInstance.getOwnerJobName()));
        }
    }

    public void cancelAllRunningInstance() throws MuleException {
        for (BatchJobInstanceAdapter jobInstance : this.jobInstanceStore.getExecutingInstances()) {
            this.cancel(jobInstance);
        }
    }

    public long getSteppingQueueSize(BatchTransactionContext context) {
        BatchJobInstanceAdapter jobInstance = context.getJobInstance();
        Lock lock = this.getLock(jobInstance);
        lock.lock();
        try {
            jobInstance = this.refresh(jobInstance);
            if (jobInstance == null || jobInstance.getStatus() != BatchJobInstanceStatus.EXECUTING) {
                return 0L;
            }
            long l = this.getBatchQueueManager().steppingQueue(jobInstance).size(context);
            return l;
        }
        catch (MuleException e) {
            LOGGER.error(String.format("Could not get stepping queue count for instance '%s' of batch job '%s'", jobInstance.getId(), jobInstance.getOwnerJobName()), (Throwable)e);
            return 0L;
        }
        finally {
            lock.unlock();
        }
    }

    private void routeNext(BatchTransactionContext ctx, BatchStepAdapter step, Record record) throws MuleException {
        BatchStep next = step.getNextStep();
        if (next != null) {
            record.setCurrentStepId(next.getName());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Queueing record back for step %s in instance %s of job %s", next.getName(), ctx.getJobInstance().getId(), ctx.getJobInstance().getOwnerJobName()));
            }
            this.queueBuffer.add(ctx.getJobInstance(), ctx, record);
        } else {
            record.setCurrentStepId(null);
            BatchUtils.singleAckAndCommitIfNecessary((BatchTransactionContext)ctx);
        }
    }

    private BatchJobInstanceAdapter updateStatistics(BatchTransactionContext ctx, List<Record> records) {
        BatchJobAdapter job = ctx.getJob();
        BatchJobInstanceAdapter jobInstance = ctx.getJobInstance();
        Lock lock = this.getLock(jobInstance);
        lock.lock();
        try {
            jobInstance = this.refresh(jobInstance);
            BatchJobResult result = jobInstance.getResult();
            if (result instanceof BatchJobResultAdapter) {
                ((BatchJobResultAdapter)result).updateFor(job, records);
            }
            this.jobInstanceStore.update(jobInstance);
        }
        finally {
            lock.unlock();
        }
        ctx.updateJobInstance(jobInstance);
        this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, BatchNotification.PROGRESS_UPDATE));
        return jobInstance;
    }

    private BatchJobInstanceAdapter refresh(BatchJobInstanceAdapter jobInstance) {
        return this.jobInstanceStore.getJobInstance(jobInstance.getOwnerJobName(), jobInstance.getId());
    }

    public BatchJobInstanceAdapter updateStatisticsAndRoute(BatchTransactionContext ctx, List<Record> records) throws MuleException {
        BatchJobInstanceAdapter jobInstance = this.updateStatistics(ctx, records);
        BatchJobAdapter job = ctx.getJob();
        ArrayList<Record> failedRecords = new ArrayList<Record>(job.getBlockSize());
        for (Record record : records) {
            BatchStepAdapter step;
            if (record.isFailedFor((BatchStep)(step = job.getStepById(record.getCurrentStepId())))) {
                failedRecords.add(record);
                this.routeError(ctx, step, record);
                continue;
            }
            this.routeNext(ctx, step, record);
        }
        if (jobInstance.getStatus().isFailure()) {
            this.jobInstanceStore.storeFailedRecords(jobInstance, failedRecords);
            BatchUtils.commit((BatchTransactionContext)ctx);
        }
        jobInstance = jobInstance.getResult().getProcessedRecords() >= jobInstance.getRecordCount() ? this.finishExecution(jobInstance, true) : this.flushSteppingQueueBuffer(jobInstance);
        return jobInstance;
    }

    public Lock getLock(BatchJobInstanceAdapter jobInstance) {
        return this.lockFactory.createLock(String.format("BATCH-JOB-%s-INSTANCE-%s", jobInstance.getOwnerJobName(), jobInstance.getId()));
    }

    public BatchJobAdapter getJobFor(BatchJobInstance jobInstance) {
        BatchJobAdapter job = this.jobs.get(jobInstance.getOwnerJobName());
        if (job == null) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Consistency error: job instance with id '%s' has owner job '%s' but such batch job couldn't be located", jobInstance.getId(), jobInstance.getOwnerJobName())));
        }
        return job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerBatchJob(BatchJobAdapter batchJob) {
        Map<String, BatchJobAdapter> map = this.jobs;
        synchronized (map) {
            boolean lazyInit;
            boolean bl = lazyInit = this.configurationProperties.resolveBooleanProperty("mule.application.deployment.lazyInit").orElse(false) != false || this.contextStarted.get();
            if (lazyInit) {
                boolean shouldInitialize = this.jobs.isEmpty();
                if (shouldInitialize) {
                    this.initializeEngine();
                }
                this.jobs.put(batchJob.getName(), batchJob);
                if (shouldInitialize) {
                    try {
                        this.workManager.start();
                    }
                    catch (MuleException e) {
                        throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"Unable to start work manager when registering batch job with name '%s'", (Object[])new Object[]{batchJob.getName()}), (Throwable)e);
                    }
                }
                return;
            }
            FlowConstruct batchJobFlowConstruct = ComponentUtils.getFromAnnotatedObjectOrFail((ConfigurationComponentLocator)this.componentLocator, (Component)batchJob);
            BatchJobAdapter previous = this.jobs.putIfAbsent(batchJob.getName(), batchJob);
            if (previous != null) {
                FlowConstruct previousBatchJobFlowConstruct = ComponentUtils.getFromAnnotatedObjectOrFail((ConfigurationComponentLocator)this.componentLocator, (Component)previous);
                throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Flow '%s' contains a batch job with name '%s', but Flow '%s' has already defined such job", batchJobFlowConstruct.getName(), batchJob.getName(), previousBatchJobFlowConstruct.getName())));
            }
        }
    }

    public Collection<BatchJob> getBatchJobs() {
        return ImmutableList.copyOf(this.jobs.values());
    }

    public Optional<BatchJob> getJob(String jobName) {
        return Optional.ofNullable((BatchJob)this.jobs.get(jobName));
    }

    private void doOnComplete(BatchJobInstanceAdapter jobInstance) throws MuleException {
        this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, BatchNotification.ON_COMPLETE_BEGIN));
        BatchJobAdapter job = this.getJobFor((BatchJobInstance)jobInstance);
        Processor onComplete = job.getOnCompleteBlock().orElse(null);
        if (onComplete != null) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("Starting execution of onComplete phase for instance %s of job %s", jobInstance.getId(), jobInstance.getOwnerJobName()));
            }
            CoreEvent event = this.createEventBuilder(null, jobInstance).message(Message.builder().value((Object)new ImmutableBatchJobResult(jobInstance.getResult())).build()).build();
            BatchProcessingTemplate template = new BatchProcessingTemplate(onComplete, this.muleContext){

                protected void onSuccess(BatchJobInstanceAdapter jobInstance, CoreEvent responseEvent) throws MuleException {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info(String.format("Finished execution of onComplete phase for instance %s of job %s", jobInstance.getId(), jobInstance.getOwnerJobName()));
                    }
                    DefaultBatchEngine.this.fireOnCompleteEndNotification(jobInstance);
                }

                protected void onException(BatchJobInstanceAdapter jobInstance, Exception e, CoreEvent event) throws MuleException {
                    BatchException batchException = BatchUtils.toBatchException((Exception)e, (BatchJobInstanceAdapter)jobInstance);
                    DefaultBatchEngine.this.onCompleteException(jobInstance, (Exception)batchException);
                }
            };
            template.process(jobInstance, event);
        } else {
            this.fireOnCompleteEndNotification(jobInstance);
        }
    }

    private void fireOnCompleteEndNotification(BatchJobInstanceAdapter jobInstance) {
        this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, BatchNotification.ON_COMPLETE_END));
    }

    private void onCompleteException(BatchJobInstanceAdapter jobInstance, Exception e) throws MuleException {
        Lock lock = this.getLock(jobInstance);
        lock.lock();
        try {
            LOGGER.error(String.format("Exception was found during on-complete step for instance %s of job %s", jobInstance.getId(), jobInstance.getOwnerJobName()));
            LOGGER.error(DefaultBatchEngine.buildExceptionLogMessage(e));
            BatchJobResult result = jobInstance.getResult();
            if (result instanceof BatchJobResultAdapter) {
                ((BatchJobResultAdapter)result).setOnCompletePhaseException(e);
            }
            if (!jobInstance.getStatus().isFailure()) {
                jobInstance.setStatus(BatchJobInstanceStatus.FAILED_ON_COMPLETE);
            }
            this.notificationDispatcher.dispatch((Notification)new BatchNotification((BatchJobInstance)jobInstance, e, BatchNotification.ON_COMPLETE_FAILED));
        }
        finally {
            lock.unlock();
        }
    }

    private long getRecordCountIfPossible(CoreEvent event) {
        Object payload = event.getMessage().getPayload().getValue();
        try {
            if (payload instanceof HasSize) {
                return ((HasSize)payload).getSize();
            }
            if (payload instanceof Collection) {
                return ((Collection)payload).size();
            }
        }
        catch (Throwable t) {
            LOGGER.warn("Exception found while trying to get the record count in advanced. Processing will continue and record count will be determined at the end of the loading phase", t);
        }
        return -1L;
    }

    private BatchRecordDispatcher createRecordDispatcher() {
        return new DefaultBatchRecordDispatcher((BatchEngine)this, this.workManager, this.schedulerService, this.muleContext.getSchedulerBaseConfig(), this.notificationListenerRegistry);
    }

    private void createAndStartHistoryExpirationMonitor() {
        this.historyExpirationMonitor = new DefaultHistoryExpirationMonitor((BatchEngine)this, this.jobInstanceStore, this.historyExpirationFrequency, TimeUnit.MILLISECONDS, this.muleContext);
        this.historyExpirationMonitor.beginMonitoring();
    }

    private boolean areJobsRegistered() {
        return !CollectionUtils.isEmpty(this.getBatchJobs());
    }

    public BatchJobInstanceStore getJobInstanceStore() {
        return this.jobInstanceStore;
    }

    public void setJobInstanceStore(BatchJobInstanceStore jobInstanceFactory) {
        this.jobInstanceStore = jobInstanceFactory;
    }

    public BatchQueueManager getBatchQueueManager() {
        return this.batchQueueManager;
    }

    public void setBatchQueueManager(BatchQueueManager batchQueueManager) {
        this.batchQueueManager = batchQueueManager;
    }

    public void setMuleContext(MuleContext context) {
        this.muleContext = context;
    }

    public int getBlockSize(BatchJobInstance jobInstance) {
        return this.getJobFor(jobInstance).getBlockSize();
    }

    public BatchTransactionContextFactory getBatchTransactionContextFactory() {
        return this.batchTransactionContextFactory;
    }

    public void setHistoryExpirationFrequency(long historyExpirationFrequency) {
        this.historyExpirationFrequency = historyExpirationFrequency;
    }

    public static String buildExceptionLogMessage(Throwable t) {
        MuleException muleException = ExceptionHelper.getRootMuleException((Throwable)t);
        if (muleException != null) {
            return muleException.getDetailedMessage();
        }
        return ExceptionUtils.getFullStackTraceWithoutMessages((Throwable)t);
    }

    public void start() throws MuleException {
        if (!this.engineInitialized.get() && this.areJobsRegistered()) {
            this.initializeEngine();
        }
    }
}

