/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.batch2.coordinator;

import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.coordinator.ReductionStepChunkProcessingResponse;
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;

public class ReductionStepExecutor {
    private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
    private final IJobPersistence myJobPersistence;
    private final PlatformTransactionManager myTxManager;
    private final TransactionTemplate myTxTemplate;

    public ReductionStepExecutor(IJobPersistence theJobPersistence, PlatformTransactionManager theTransactionManager) {
        this.myJobPersistence = theJobPersistence;
        this.myTxManager = theTransactionManager;
        this.myTxTemplate = new TransactionTemplate(theTransactionManager);
        this.myTxTemplate.setPropagationBehavior(3);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeReductionStep(JobInstance theInstance, JobDefinitionStep<PT, IT, OT> theStep, Class<IT> theInputType, PT theParameters) {
        IReductionStepWorker reductionStepWorker = (IReductionStepWorker)theStep.getJobStepWorker();
        if (!this.myJobPersistence.markInstanceAsStatus(theInstance.getInstanceId(), StatusEnum.FINALIZE)) {
            ourLog.warn("JobInstance[{}] is already in FINALIZE state. In memory status is {}. Reduction step will not rerun! This could be a long running reduction job resulting in the processed msg not being acknowledge, or the result of a failed process or server restarting.", (Object)theInstance.getInstanceId(), (Object)theInstance.getStatus().name());
            return false;
        }
        theInstance.setStatus(StatusEnum.FINALIZE);
        boolean defaultSuccessValue = true;
        ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue);
        try {
            this.myTxTemplate.executeWithoutResult(status -> {
                try (Stream<WorkChunk> chunkIterator2 = this.myJobPersistence.fetchAllWorkChunksForStepStream(theInstance.getInstanceId(), theStep.getStepId());){
                    chunkIterator2.forEach(chunk -> this.processChunk((WorkChunk)chunk, theInstance, theInputType, theParameters, reductionStepWorker, response));
                }
            });
        }
        finally {
            if (response.hasSuccessfulChunksIds()) {
                this.myJobPersistence.markWorkChunksWithStatusAndWipeData(theInstance.getInstanceId(), response.getSuccessfulChunkIds(), StatusEnum.COMPLETED, null);
            }
            if (response.hasFailedChunkIds()) {
                this.myJobPersistence.markWorkChunksWithStatusAndWipeData(theInstance.getInstanceId(), response.getFailedChunksIds(), StatusEnum.FAILED, "JOB ABORTED");
            }
        }
        if (!response.hasSuccessfulChunksIds()) {
            response.setSuccessful(false);
        }
        return response.isSuccessful();
    }

    private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunk(WorkChunk theChunk, JobInstance theInstance, Class<IT> theInputType, PT theParameters, IReductionStepWorker<PT, IT, OT> theReductionStepWorker, ReductionStepChunkProcessingResponse theResponseObject) {
        if (!theChunk.getStatus().isIncomplete()) {
            ourLog.error("Unexpected chunk {} with status {} found while reducing {}.  No chunks feeding into a reduction step should be complete.", new Object[]{theChunk.getId(), theChunk.getStatus(), theInstance});
            return;
        }
        if (theResponseObject.hasFailedChunkIds()) {
            theResponseObject.addFailedChunkId(theChunk);
        } else {
            try {
                ChunkExecutionDetails<PT, IT> chunkDetails = new ChunkExecutionDetails<PT, IT>(theChunk.getData(theInputType), theParameters, theInstance.getInstanceId(), theChunk.getId());
                ChunkOutcome outcome = theReductionStepWorker.consume(chunkDetails);
                switch (outcome.getStatus()) {
                    case SUCCESS: {
                        theResponseObject.addSuccessfulChunkId(theChunk);
                        break;
                    }
                    case ABORT: {
                        ourLog.error("Processing of work chunk {} resulted in aborting job.", (Object)theChunk.getId());
                        theResponseObject.addFailedChunkId(theChunk);
                        theResponseObject.setSuccessful(false);
                        break;
                    }
                    case FAIL: {
                        this.myJobPersistence.markWorkChunkAsFailed(theChunk.getId(), "Step worker failed to process work chunk " + theChunk.getId());
                        theResponseObject.setSuccessful(false);
                    }
                }
            }
            catch (Exception e) {
                String msg = String.format("Reduction step failed to execute chunk reduction for chunk %s with exception: %s.", theChunk.getId(), e.getMessage());
                ourLog.error(msg, (Throwable)e);
                theResponseObject.setSuccessful(false);
                this.myJobPersistence.markWorkChunkAsFailed(theChunk.getId(), msg);
            }
        }
    }
}

