/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.batch2.coordinator;

import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.JobStepExecutor;
import ca.uhn.fhir.batch2.coordinator.JobStepExecutorFactory;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.Logs;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

class WorkChannelMessageHandler
implements MessageHandler {
    private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
    private final IJobPersistence myJobPersistence;
    private final JobDefinitionRegistry myJobDefinitionRegistry;
    private final JobStepExecutorFactory myJobStepExecutorFactory;
    private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;

    WorkChannelMessageHandler(@Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender, @Nonnull WorkChunkProcessor theExecutorSvc, @Nonnull IJobMaintenanceService theJobMaintenanceService) {
        this.myJobPersistence = theJobPersistence;
        this.myJobDefinitionRegistry = theJobDefinitionRegistry;
        this.myJobStepExecutorFactory = new JobStepExecutorFactory(theJobPersistence, theBatchJobSender, theExecutorSvc, theJobMaintenanceService);
        this.myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobPersistence);
    }

    public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
        this.handleWorkChannelMessage((JobWorkNotificationJsonMessage)theMessage);
    }

    private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) {
        JobWorkNotification workNotification = theMessage.getPayload();
        ourLog.info("Received work notification for {}", (Object)workNotification);
        String chunkId = workNotification.getChunkId();
        Validate.notNull((Object)chunkId);
        boolean isReductionWorkNotification = "REDUCTION".equals(chunkId);
        JobWorkCursor<?, ?, ?> cursor = null;
        WorkChunk workChunk = null;
        if (!isReductionWorkNotification) {
            Optional<WorkChunk> chunkOpt = this.myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
            if (chunkOpt.isEmpty()) {
                ourLog.error("Unable to find chunk with ID {} - Aborting", (Object)chunkId);
                return;
            }
            workChunk = chunkOpt.get();
            ourLog.debug("Worker picked up chunk. [chunkId={}, stepId={}, startTime={}]", new Object[]{chunkId, workChunk.getTargetStepId(), workChunk.getStartTime()});
            cursor = this.buildCursorFromNotification(workNotification);
            Validate.isTrue((boolean)workChunk.getTargetStepId().equals(cursor.getCurrentStepId()), (String)"Chunk %s has target step %s but expected %s", (Object[])new Object[]{chunkId, workChunk.getTargetStepId(), cursor.getCurrentStepId()});
        } else {
            ourLog.debug("Processing reduction step work notification. No associated workchunks.");
            cursor = this.buildCursorFromNotification(workNotification);
        }
        Optional<JobInstance> instanceOpt = this.myJobPersistence.fetchInstance(workNotification.getInstanceId());
        JobInstance instance = instanceOpt.orElseThrow(() -> new InternalErrorException("Unknown instance: " + workNotification.getInstanceId()));
        this.markInProgressIfQueued(instance);
        this.myJobDefinitionRegistry.setJobDefinition(instance);
        String instanceId = instance.getInstanceId();
        if (instance.isCancelled()) {
            ourLog.info("Skipping chunk {} because job instance is cancelled", (Object)chunkId);
            this.myJobPersistence.markInstanceAsCompleted(instanceId);
            return;
        }
        JobStepExecutor<?, ?, ?> stepExecutor = this.myJobStepExecutorFactory.newJobStepExecutor(instance, workChunk, cursor);
        if (isReductionWorkNotification) {
            ScheduledExecutorService exService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

                @Override
                public Thread newThread(@NotNull Runnable r) {
                    return new Thread(r, "Reduction-step-thread");
                }
            });
            exService.execute(stepExecutor::executeStep);
        } else {
            stepExecutor.executeStep();
        }
    }

    private void markInProgressIfQueued(JobInstance theInstance) {
        if (theInstance.getStatus() == StatusEnum.QUEUED) {
            this.myJobInstanceStatusUpdater.updateInstanceStatus(theInstance, StatusEnum.IN_PROGRESS);
        }
    }

    private JobWorkCursor<?, ?, ?> buildCursorFromNotification(JobWorkNotification workNotification) {
        String jobDefinitionId = workNotification.getJobDefinitionId();
        int jobDefinitionVersion = workNotification.getJobDefinitionVersion();
        JobDefinition<?> definition = this.myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobDefinitionId, jobDefinitionVersion);
        return JobWorkCursor.fromJobDefinitionAndRequestedStepId(definition, workNotification.getTargetStepId());
    }
}

