/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.batch2.coordinator;

import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.ReductionStepChunkProcessingResponse;
import ca.uhn.fhir.batch2.coordinator.ReductionStepDataSink;
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.transaction.annotation.Propagation;

public class ReductionStepExecutorServiceImpl
implements IReductionStepExecutorService,
IHasScheduledJobs {
    public static final String SCHEDULED_JOB_ID = ReductionStepExecutorScheduledJob.class.getName();
    private static final Logger ourLog = LoggerFactory.getLogger(ReductionStepExecutorServiceImpl.class);
    private final Map<String, JobWorkCursor> myInstanceIdToJobWorkCursor = Collections.synchronizedMap(new LinkedHashMap());
    private final ExecutorService myReducerExecutor;
    private final IJobPersistence myJobPersistence;
    private final IHapiTransactionService myTransactionService;
    private final Semaphore myCurrentlyExecuting = new Semaphore(1);
    private final AtomicReference<String> myCurrentlyFinalizingInstanceId = new AtomicReference();
    private final JobDefinitionRegistry myJobDefinitionRegistry;
    private Timer myHeartbeatTimer;

    public ReductionStepExecutorServiceImpl(IJobPersistence theJobPersistence, IHapiTransactionService theTransactionService, JobDefinitionRegistry theJobDefinitionRegistry) {
        this.myJobPersistence = theJobPersistence;
        this.myTransactionService = theTransactionService;
        this.myJobDefinitionRegistry = theJobDefinitionRegistry;
        this.myReducerExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new CustomizableThreadFactory("batch2-reducer"));
    }

    @EventListener(value={ContextRefreshedEvent.class})
    public void start() {
        if (this.myHeartbeatTimer == null) {
            this.myHeartbeatTimer = new Timer("batch2-reducer-heartbeat");
            this.myHeartbeatTimer.schedule((TimerTask)new HeartbeatTimerTask(), 60000L, 60000L);
        }
    }

    private void runHeartbeat() {
        String currentlyFinalizingInstanceId = this.myCurrentlyFinalizingInstanceId.get();
        if (currentlyFinalizingInstanceId != null) {
            ourLog.info("Running heartbeat for instance: {}", (Object)currentlyFinalizingInstanceId);
            this.executeInTransactionWithSynchronization(() -> {
                this.myJobPersistence.updateInstanceUpdateTime(currentlyFinalizingInstanceId);
                return null;
            });
        }
    }

    @EventListener(value={ContextClosedEvent.class})
    public void shutdown() {
        if (this.myHeartbeatTimer != null) {
            this.myHeartbeatTimer.cancel();
            this.myHeartbeatTimer = null;
        }
    }

    @Override
    public void triggerReductionStep(String theInstanceId, JobWorkCursor<?, ?, ?> theJobWorkCursor) {
        this.myInstanceIdToJobWorkCursor.putIfAbsent(theInstanceId, theJobWorkCursor);
        if (this.myCurrentlyExecuting.availablePermits() > 0) {
            this.myReducerExecutor.submit(this::reducerPass);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reducerPass() {
        if (this.myCurrentlyExecuting.tryAcquire()) {
            try {
                String[] instanceIds = this.myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]);
                if (instanceIds.length > 0) {
                    String instanceId = instanceIds[0];
                    this.myCurrentlyFinalizingInstanceId.set(instanceId);
                    JobWorkCursor jobWorkCursor = this.myInstanceIdToJobWorkCursor.get(instanceId);
                    this.executeReductionStep(instanceId, jobWorkCursor);
                    this.myInstanceIdToJobWorkCursor.remove(instanceId);
                }
            }
            catch (Exception e) {
                ourLog.error("Failed to execute reducer pass", (Throwable)e);
            }
            finally {
                this.myCurrentlyFinalizingInstanceId.set(null);
                this.myCurrentlyExecuting.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> ReductionStepChunkProcessingResponse executeReductionStep(String theInstanceId, JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
        JobDefinitionStep step = theJobWorkCursor.getCurrentStep();
        JobInstance instance = this.executeInTransactionWithSynchronization(() -> this.myJobPersistence.fetchInstance(theInstanceId).orElseThrow(() -> new InternalErrorException("Unknown instance: " + theInstanceId)));
        boolean shouldProceed = false;
        switch (instance.getStatus()) {
            case IN_PROGRESS: 
            case ERRORED: {
                boolean changed = this.executeInTransactionWithSynchronization(() -> this.myJobPersistence.markInstanceAsStatusWhenStatusIn(instance.getInstanceId(), StatusEnum.FINALIZE, EnumSet.of(StatusEnum.IN_PROGRESS, StatusEnum.ERRORED)));
                if (!changed) break;
                ourLog.info("Job instance {} has been set to FINALIZE state - Beginning reducer step", (Object)instance.getInstanceId());
                shouldProceed = true;
                break;
            }
        }
        if (!shouldProceed) {
            ourLog.warn("JobInstance[{}] should not be finalized at this time. In memory status is {}. Reduction step will not rerun! This could be a long running reduction job resulting in the processed msg not being acknowledged, or the result of a failed process or server restarting.", (Object)instance.getInstanceId(), (Object)instance.getStatus());
            return new ReductionStepChunkProcessingResponse(false);
        }
        Object parameters = instance.getParameters(theJobWorkCursor.getJobDefinition().getParametersType());
        IReductionStepWorker reductionStepWorker = (IReductionStepWorker)step.getJobStepWorker();
        instance.setStatus(StatusEnum.FINALIZE);
        boolean defaultSuccessValue = true;
        ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue);
        try {
            this.executeInTransactionWithSynchronization(() -> {
                try (Stream<WorkChunk> chunkIterator = this.myJobPersistence.fetchAllWorkChunksForStepStream(instance.getInstanceId(), step.getStepId());){
                    chunkIterator.forEach(chunk -> this.processChunk((WorkChunk)chunk, instance, parameters, reductionStepWorker, response, theJobWorkCursor));
                }
                return null;
            });
        }
        finally {
            this.executeInTransactionWithSynchronization(() -> {
                ourLog.info("Reduction step for instance[{}] produced {} successful and {} failed chunks", new Object[]{instance.getInstanceId(), response.getSuccessfulChunkIds().size(), response.getFailedChunksIds().size()});
                ReductionStepDataSink dataSink = new ReductionStepDataSink(instance.getInstanceId(), theJobWorkCursor, this.myJobPersistence, this.myJobDefinitionRegistry);
                StepExecutionDetails<IModelJson, Object> chunkDetails = new StepExecutionDetails<IModelJson, Object>((IModelJson)parameters, null, instance, "REDUCTION");
                if (response.isSuccessful()) {
                    reductionStepWorker.run(chunkDetails, dataSink);
                }
                if (response.hasSuccessfulChunksIds()) {
                    this.myJobPersistence.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), response.getSuccessfulChunkIds(), WorkChunkStatusEnum.COMPLETED, null);
                }
                if (response.hasFailedChunkIds()) {
                    this.myJobPersistence.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), response.getFailedChunksIds(), WorkChunkStatusEnum.FAILED, "JOB ABORTED");
                }
                return null;
            });
        }
        if (!response.hasSuccessfulChunksIds()) {
            response.setSuccessful(false);
        }
        return response;
    }

    private <T> T executeInTransactionWithSynchronization(Callable<T> runnable) {
        return (T)this.myTransactionService.withRequest(null).withPropagation(Propagation.REQUIRES_NEW).execute(runnable);
    }

    public void scheduleJobs(ISchedulerService theSchedulerService) {
        theSchedulerService.scheduleClusteredJob(10000L, this.buildJobDefinition());
    }

    @Nonnull
    private ScheduledJobDefinition buildJobDefinition() {
        ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
        jobDefinition.setId(SCHEDULED_JOB_ID);
        jobDefinition.setJobClass(ReductionStepExecutorScheduledJob.class);
        return jobDefinition;
    }

    private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunk(WorkChunk theChunk, JobInstance theInstance, PT theParameters, IReductionStepWorker<PT, IT, OT> theReductionStepWorker, ReductionStepChunkProcessingResponse theResponseObject, JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
        if (!theChunk.getStatus().isIncomplete()) {
            ourLog.error("Unexpected chunk {} with status {} found while reducing {}.  No chunks feeding into a reduction step should be complete.", new Object[]{theChunk.getId(), theChunk.getStatus(), theInstance});
            return;
        }
        if (theResponseObject.hasFailedChunkIds()) {
            theResponseObject.addFailedChunkId(theChunk);
        } else {
            try {
                IT chunkData = theChunk.getData(theJobWorkCursor.getCurrentStep().getInputType());
                ChunkExecutionDetails<PT, IT> chunkDetails = new ChunkExecutionDetails<PT, IT>(chunkData, theParameters, theInstance.getInstanceId(), theChunk.getId());
                ChunkOutcome outcome = theReductionStepWorker.consume(chunkDetails);
                switch (outcome.getStatus()) {
                    case SUCCESS: {
                        theResponseObject.addSuccessfulChunkId(theChunk);
                        break;
                    }
                    case FAILED: {
                        ourLog.error("Processing of work chunk {} resulted in aborting job.", (Object)theChunk.getId());
                        theResponseObject.addFailedChunkId(theChunk);
                        theResponseObject.setSuccessful(false);
                    }
                }
            }
            catch (Exception e) {
                String msg = String.format("Reduction step failed to execute chunk reduction for chunk %s with exception: %s.", theChunk.getId(), e.getMessage());
                ourLog.error(msg, (Throwable)e);
                theResponseObject.setSuccessful(false);
                this.myJobPersistence.onWorkChunkFailed(theChunk.getId(), msg);
            }
        }
    }

    private class HeartbeatTimerTask
    extends TimerTask {
        private HeartbeatTimerTask() {
        }

        @Override
        public void run() {
            ReductionStepExecutorServiceImpl.this.runHeartbeat();
        }
    }

    public static class ReductionStepExecutorScheduledJob
    implements HapiJob {
        @Autowired
        private IReductionStepExecutorService myTarget;

        public void execute(JobExecutionContext theContext) {
            this.myTarget.reducerPass();
        }
    }
}

