/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.batch2.coordinator;

import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.JobStepExecutor;
import ca.uhn.fhir.batch2.coordinator.JobStepExecutorFactory;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.util.Logs;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

class WorkChannelMessageHandler
implements MessageHandler {
    private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
    private final IJobPersistence myJobPersistence;
    private final JobDefinitionRegistry myJobDefinitionRegistry;
    private final JobStepExecutorFactory myJobStepExecutorFactory;
    private final IHapiTransactionService myHapiTransactionService;

    WorkChannelMessageHandler(@Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender, @Nonnull WorkChunkProcessor theExecutorSvc, @Nonnull IJobMaintenanceService theJobMaintenanceService, IHapiTransactionService theHapiTransactionService) {
        this.myJobPersistence = theJobPersistence;
        this.myJobDefinitionRegistry = theJobDefinitionRegistry;
        this.myHapiTransactionService = theHapiTransactionService;
        this.myJobStepExecutorFactory = new JobStepExecutorFactory(theJobPersistence, theBatchJobSender, theExecutorSvc, theJobMaintenanceService, theJobDefinitionRegistry);
    }

    public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
        this.handleWorkChannelMessage((JobWorkNotificationJsonMessage)theMessage);
    }

    private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) {
        JobWorkNotification workNotification = theMessage.getPayload();
        ourLog.info("Received work notification for {}", (Object)workNotification);
        this.executeInTxRollbackWhenEmpty(() -> Optional.of(new MessageProcess(workNotification)).flatMap(MessageProcess::validateChunkId).flatMap(MessageProcess::loadJobDefinitionOrThrow).flatMap(MessageProcess::loadJobInstance).flatMap(MessageProcess::updateChunkStatusAndValidate).flatMap(MessageProcess::updateAndValidateJobStatus).flatMap(MessageProcess::buildCursor).flatMap(MessageProcess::buildStepExecutor)).ifPresentOrElse(process -> process.myStepExector.executeStep(), () -> ourLog.debug("Discarding chunk notification {}", (Object)workNotification));
    }

    <T> Optional<T> executeInTxRollbackWhenEmpty(Supplier<Optional<T>> theCallback) {
        return (Optional)this.myHapiTransactionService.withSystemRequest().execute(theTransactionStatus -> {
            Optional setupProcessing = (Optional)theCallback.get();
            if (setupProcessing.isEmpty()) {
                ourLog.debug("WorkChunk setup failed - rollback tx");
                theTransactionStatus.setRollbackOnly();
            }
            return setupProcessing;
        });
    }

    class MessageProcess {
        final JobWorkNotification myWorkNotification;
        String myChunkId;
        WorkChunk myWorkChunk;
        JobWorkCursor<?, ?, ?> myCursor;
        JobInstance myJobInstance;
        JobDefinition<?> myJobDefinition;
        JobStepExecutor<?, ?, ?> myStepExector;

        MessageProcess(JobWorkNotification theWorkNotification) {
            this.myWorkNotification = theWorkNotification;
        }

        Optional<MessageProcess> validateChunkId() {
            this.myChunkId = this.myWorkNotification.getChunkId();
            if (this.myChunkId == null) {
                ourLog.error("Received work notification with null chunkId: {}", (Object)this.myWorkNotification);
                return Optional.empty();
            }
            return Optional.of(this);
        }

        Optional<MessageProcess> loadJobDefinitionOrThrow() {
            String jobDefinitionId = this.myWorkNotification.getJobDefinitionId();
            int jobDefinitionVersion = this.myWorkNotification.getJobDefinitionVersion();
            this.myJobDefinition = WorkChannelMessageHandler.this.myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobDefinitionId, jobDefinitionVersion);
            return Optional.of(this);
        }

        Optional<MessageProcess> loadJobInstance() {
            return WorkChannelMessageHandler.this.myJobPersistence.fetchInstance(this.myWorkNotification.getInstanceId()).or(() -> {
                ourLog.error("No instance {} exists for chunk notification {}", (Object)this.myWorkNotification.getInstanceId(), (Object)this.myWorkNotification);
                return Optional.empty();
            }).map(instance -> {
                this.myJobInstance = instance;
                instance.setJobDefinition(this.myJobDefinition);
                return this;
            });
        }

        Optional<MessageProcess> updateChunkStatusAndValidate() {
            return WorkChannelMessageHandler.this.myJobPersistence.onWorkChunkDequeue(this.myChunkId).or(() -> {
                ourLog.error("Unable to find chunk with ID {} - Aborting.  {}", (Object)this.myChunkId, (Object)this.myWorkNotification);
                return Optional.empty();
            }).map(chunk -> {
                this.myWorkChunk = chunk;
                ourLog.debug("Worker picked up chunk. [chunkId={}, stepId={}, startTime={}]", new Object[]{this.myChunkId, this.myWorkChunk.getTargetStepId(), this.myWorkChunk.getStartTime()});
                return this;
            });
        }

        Optional<MessageProcess> updateAndValidateJobStatus() {
            ourLog.trace("Check status {} of job {} for chunk {}", new Object[]{this.myJobInstance.getStatus(), this.myJobInstance.getInstanceId(), this.myChunkId});
            switch (this.myJobInstance.getStatus()) {
                case QUEUED: {
                    WorkChannelMessageHandler.this.myJobPersistence.onChunkDequeued(this.myJobInstance.getInstanceId());
                    break;
                }
                case IN_PROGRESS: 
                case ERRORED: 
                case FINALIZE: {
                    break;
                }
                case COMPLETED: {
                    ourLog.error("Received chunk {}, but job instance is {}.  Skipping.", (Object)this.myChunkId, (Object)this.myJobInstance.getStatus());
                    return Optional.empty();
                }
                default: {
                    ourLog.info("Skipping chunk {} because job instance is {}", (Object)this.myChunkId, (Object)this.myJobInstance.getStatus());
                    return Optional.empty();
                }
            }
            return Optional.of(this);
        }

        Optional<MessageProcess> buildCursor() {
            this.myCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(this.myJobDefinition, this.myWorkNotification.getTargetStepId());
            if (!this.myWorkChunk.getTargetStepId().equals(this.myCursor.getCurrentStepId())) {
                ourLog.error("Chunk {} has target step {} but expected {}", new Object[]{this.myChunkId, this.myWorkChunk.getTargetStepId(), this.myCursor.getCurrentStepId()});
                return Optional.empty();
            }
            return Optional.of(this);
        }

        public Optional<MessageProcess> buildStepExecutor() {
            this.myStepExector = WorkChannelMessageHandler.this.myJobStepExecutorFactory.newJobStepExecutor(this.myJobInstance, this.myWorkChunk, this.myCursor);
            return Optional.of(this);
        }
    }
}

