/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.batch2.maintenance;

import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import java.util.List;
import org.slf4j.Logger;

public class JobInstanceProcessor {
    private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
    public static final long PURGE_THRESHOLD = 604800000L;
    private final IJobPersistence myJobPersistence;
    private final BatchJobSender myBatchJobSender;
    private final JobChunkProgressAccumulator myProgressAccumulator;
    private final JobInstanceProgressCalculator myJobInstanceProgressCalculator;
    private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
    private final IReductionStepExecutorService myReductionStepExecutorService;
    private final String myInstanceId;
    private final JobDefinitionRegistry myJobDefinitionegistry;

    public JobInstanceProcessor(IJobPersistence theJobPersistence, BatchJobSender theBatchJobSender, String theInstanceId, JobChunkProgressAccumulator theProgressAccumulator, IReductionStepExecutorService theReductionStepExecutorService, JobDefinitionRegistry theJobDefinitionRegistry) {
        this.myJobPersistence = theJobPersistence;
        this.myBatchJobSender = theBatchJobSender;
        this.myInstanceId = theInstanceId;
        this.myProgressAccumulator = theProgressAccumulator;
        this.myReductionStepExecutorService = theReductionStepExecutorService;
        this.myJobDefinitionegistry = theJobDefinitionRegistry;
        this.myJobInstanceProgressCalculator = new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry);
        this.myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
    }

    public void process() {
        ourLog.debug("Starting job processing: {}", (Object)this.myInstanceId);
        StopWatch stopWatch = new StopWatch();
        JobInstance theInstance = this.myJobPersistence.fetchInstance(this.myInstanceId).orElse(null);
        if (theInstance == null) {
            return;
        }
        boolean cancelUpdate = this.handleCancellation(theInstance);
        if (cancelUpdate) {
            theInstance = this.myJobPersistence.fetchInstance(this.myInstanceId).orElseThrow();
        }
        this.cleanupInstance(theInstance);
        this.triggerGatedExecutions(theInstance);
        ourLog.debug("Finished job processing: {} - {}", (Object)this.myInstanceId, (Object)stopWatch);
    }

    private boolean handleCancellation(JobInstance theInstance) {
        if (theInstance.isPendingCancellationRequest()) {
            String errorMessage = this.buildCancelledMessage(theInstance);
            ourLog.info("Job {} moving to CANCELLED", (Object)theInstance.getInstanceId());
            return this.myJobPersistence.updateInstance(theInstance.getInstanceId(), instance -> {
                boolean changed = this.myJobInstanceStatusUpdater.updateInstanceStatus(instance, StatusEnum.CANCELLED);
                if (changed) {
                    instance.setErrorMessage(errorMessage);
                }
                return changed;
            });
        }
        return false;
    }

    private String buildCancelledMessage(JobInstance theInstance) {
        Object msg = "Job instance cancelled";
        if (theInstance.hasGatedStep()) {
            msg = (String)msg + " while running step " + theInstance.getCurrentGatedStepId();
        }
        return msg;
    }

    private void cleanupInstance(JobInstance theInstance) {
        switch (theInstance.getStatus()) {
            case QUEUED: {
                break;
            }
            case FINALIZE: {
                return;
            }
            case IN_PROGRESS: 
            case ERRORED: {
                this.myJobInstanceProgressCalculator.calculateAndStoreInstanceProgress(theInstance.getInstanceId());
                break;
            }
            case COMPLETED: 
            case FAILED: {
                if (!this.purgeExpiredInstance(theInstance)) break;
                return;
            }
            case CANCELLED: {
                this.purgeExpiredInstance(theInstance);
                return;
            }
        }
        if (theInstance.isFinished() && !theInstance.isWorkChunksPurged()) {
            this.myJobPersistence.deleteChunksAndMarkInstanceAsChunksPurged(theInstance.getInstanceId());
        }
    }

    private boolean purgeExpiredInstance(JobInstance theInstance) {
        if (theInstance.getEndTime() != null) {
            long cutoff = System.currentTimeMillis() - 604800000L;
            if (theInstance.getEndTime().getTime() < cutoff) {
                ourLog.info("Deleting old job instance {}", (Object)theInstance.getInstanceId());
                this.myJobPersistence.deleteInstanceAndChunks(theInstance.getInstanceId());
                return true;
            }
        }
        return false;
    }

    private void triggerGatedExecutions(JobInstance theInstance) {
        String currentStepId;
        if (!theInstance.isRunning()) {
            ourLog.debug("JobInstance {} is not in a \"running\" state. Status {}", (Object)theInstance.getInstanceId(), (Object)theInstance.getStatus());
            return;
        }
        if (!theInstance.hasGatedStep()) {
            return;
        }
        JobDefinition jobDefinition = this.myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
        JobWorkCursor jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(jobDefinition, theInstance.getCurrentGatedStepId());
        if (jobWorkCursor.isFinalStep() && !jobWorkCursor.isReductionStep()) {
            ourLog.debug("Job instance {} is in final step and it's not a reducer step", (Object)theInstance.getInstanceId());
            return;
        }
        String instanceId = theInstance.getInstanceId();
        boolean shouldAdvance = this.myJobPersistence.canAdvanceInstanceToNextStep(instanceId, currentStepId = jobWorkCursor.getCurrentStepId());
        if (shouldAdvance) {
            String nextStepId = jobWorkCursor.nextStep.getStepId();
            ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", new Object[]{instanceId, currentStepId, nextStepId});
            if (jobWorkCursor.nextStep.isReductionStep()) {
                JobWorkCursor nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(jobDefinition, jobWorkCursor.nextStep.getStepId());
                this.myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
            } else {
                this.processChunksForNextSteps(theInstance, nextStepId);
            }
        } else {
            ourLog.debug("Not ready to advance gated execution of instance {} from step {} to {}.", new Object[]{instanceId, currentStepId, jobWorkCursor.nextStep.getStepId()});
        }
    }

    private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) {
        String instanceId = theInstance.getInstanceId();
        List<String> queuedChunksForNextStep = this.myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.QUEUED);
        int totalChunksForNextStep = this.myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, nextStepId);
        if (totalChunksForNextStep != queuedChunksForNextStep.size()) {
            ourLog.debug("Total ProgressAccumulator QUEUED chunk count does not match QUEUED chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]", new Object[]{instanceId, nextStepId, totalChunksForNextStep, queuedChunksForNextStep.size()});
        }
        List<String> chunksToSubmit = this.myJobPersistence.fetchAllChunkIdsForStepWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.QUEUED);
        boolean changed = this.myJobPersistence.updateInstance(instanceId, instance -> {
            if (instance.getCurrentGatedStepId().equals(nextStepId)) {
                return false;
            }
            instance.setCurrentGatedStepId(nextStepId);
            return true;
        });
        if (!changed) {
            ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", (Object)nextStepId, (Object)instanceId);
            return;
        }
        for (String nextChunkId : chunksToSubmit) {
            JobWorkNotification workNotification = new JobWorkNotification(theInstance, nextStepId, nextChunkId);
            this.myBatchJobSender.sendWorkChannelMessage(workNotification);
        }
        ourLog.debug("Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]", new Object[]{chunksToSubmit.size(), instanceId, nextStepId});
    }
}

