/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.runner.memory;

import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.DefaultFlowExecutor;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.ExecutorService;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.tasks.flows.Template;
import io.kestra.runner.memory.MemoryMultipleConditionStorage;
import io.kestra.runner.memory.MemoryQueueEnabled;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@MemoryQueueEnabled
public class MemoryExecutor
implements ExecutorInterface {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MemoryExecutor.class);
    private static final MemoryMultipleConditionStorage multipleConditionStorage = new MemoryMultipleConditionStorage();
    private static final ConcurrentHashMap<String, ExecutionState> EXECUTIONS = new ConcurrentHashMap();
    private static final ConcurrentHashMap<String, WorkerTaskExecution> WORKERTASKEXECUTIONS_WATCHER = new ConcurrentHashMap();
    private List<Flow> allFlows;
    private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor();
    @Inject
    private ApplicationContext applicationContext;
    @Inject
    private FlowRepositoryInterface flowRepository;
    @Inject
    @Named(value="executionQueue")
    private QueueInterface<Execution> executionQueue;
    @Inject
    @Named(value="workerTaskQueue")
    private QueueInterface<WorkerTask> workerTaskQueue;
    @Inject
    @Named(value="workerTaskResultQueue")
    private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
    @Inject
    @Named(value="workerTaskLogQueue")
    private QueueInterface<LogEntry> logQueue;
    @Inject
    private FlowService flowService;
    @Inject
    private TaskDefaultService taskDefaultService;
    @Inject
    private Template.TemplateExecutorInterface templateExecutorInterface;
    @Inject
    private ExecutorService executorService;
    @Inject
    private ConditionService conditionService;
    @Inject
    private RunContextFactory runContextFactory;
    @Inject
    private MetricRegistry metricRegistry;
    @Inject
    private ExecutionService executionService;
    @Inject
    protected FlowListenersInterface flowListeners;

    public void run() {
        this.flowListeners.run();
        this.flowListeners.listen(flows -> {
            this.allFlows = flows;
        });
        this.applicationContext.registerSingleton((Object)new DefaultFlowExecutor(this.flowListeners, this.flowRepository));
        this.executionQueue.receive(MemoryExecutor.class, this::executionQueue);
        this.workerTaskResultQueue.receive(MemoryExecutor.class, this::workerTaskResultQueue);
    }

    private void executionQueue(Execution message) {
        if (message.getTaskRunList() == null || message.getTaskRunList().size() == 0 || message.getState().isCreated()) {
            this.handleExecution(this.saveExecution(message));
        }
    }

    private Flow transform(Flow flow, Execution execution) {
        try {
            flow = Template.injectTemplate((Flow)flow, (Execution)execution, (namespace, id) -> this.templateExecutorInterface.findById(namespace, id).orElse(null));
        }
        catch (InternalException e) {
            log.debug("Failed to inject template", (Throwable)e);
        }
        return this.taskDefaultService.injectDefaults(flow, execution);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleExecution(ExecutionState state) {
        MemoryExecutor memoryExecutor = this;
        synchronized (memoryExecutor) {
            Flow flow = this.transform(this.flowRepository.findByExecution(state.execution), state.execution);
            Execution execution = state.execution;
            Executor executor = new Executor(execution, null).withFlow(flow);
            if (log.isDebugEnabled()) {
                this.executorService.log(log, Boolean.valueOf(true), executor);
            }
            if ((executor = this.executorService.process(executor)).getNexts().size() > 0 && this.deduplicateNexts(execution, executor.getNexts())) {
                executor.withExecution(this.executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()), "onNexts");
            }
            if (executor.getException() != null) {
                this.handleFailedExecutionFromExecutor(executor, executor.getException());
            } else if (executor.isExecutionUpdated()) {
                this.toExecution(executor);
            }
            if (executor.getWorkerTasks().size() > 0) {
                List workerTasksDedup = executor.getWorkerTasks().stream().filter(workerTask -> this.deduplicateWorkerTask(execution, workerTask.getTaskRun())).collect(Collectors.toList());
                workerTasksDedup.stream().filter(workerTask -> workerTask.getTask().isSendToWorkerTask()).forEach(arg_0 -> this.workerTaskQueue.emit(arg_0));
                workerTasksDedup.stream().filter(workerTask -> workerTask.getTask().isFlowable()).map(workerTask -> new WorkerTaskResult(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING)))).forEach(arg_0 -> this.workerTaskResultQueue.emit(arg_0));
            }
            if (executor.getWorkerTaskResults().size() > 0) {
                executor.getWorkerTaskResults().forEach(arg_0 -> this.workerTaskResultQueue.emit(arg_0));
            }
            if (executor.getExecutionDelays().size() > 0) {
                executor.getExecutionDelays().forEach(workerTaskResultDelay -> {
                    long between = ChronoUnit.MICROS.between(Instant.now(), workerTaskResultDelay.getDate());
                    if (between <= 0L) {
                        between = 1L;
                    }
                    this.schedulerDelay.schedule(() -> {
                        try {
                            ExecutionState executionState = EXECUTIONS.get(workerTaskResultDelay.getExecutionId());
                            Execution markAsExecution = this.executionService.markAs(executionState.execution, workerTaskResultDelay.getTaskRunId(), State.Type.RUNNING);
                            this.executionQueue.emit((Object)markAsExecution);
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }, between, TimeUnit.MICROSECONDS);
                });
            }
            if (executor.getWorkerTaskExecutions().size() > 0) {
                executor.getWorkerTaskExecutions().forEach(workerTaskExecution -> {
                    WORKERTASKEXECUTIONS_WATCHER.put(workerTaskExecution.getExecution().getId(), (WorkerTaskExecution)workerTaskExecution);
                    this.executionQueue.emit((Object)workerTaskExecution.getExecution());
                });
            }
            if (this.conditionService.isTerminatedWithListeners(flow, execution)) {
                this.executionQueue.emit((Object)execution);
            }
            if (this.conditionService.isTerminatedWithListeners(flow, execution)) {
                multipleConditionStorage.save(this.flowService.multipleFlowTrigger(this.allFlows.stream(), flow, execution, (MultipleConditionStorageInterface)multipleConditionStorage));
                this.flowService.flowTriggerExecution(this.allFlows.stream(), execution, (MultipleConditionStorageInterface)multipleConditionStorage).forEach(arg_0 -> this.executionQueue.emit(arg_0));
                this.flowService.multipleFlowToDelete(this.allFlows.stream(), (MultipleConditionStorageInterface)multipleConditionStorage).forEach(multipleConditionStorage::delete);
            }
            if (this.conditionService.isTerminatedWithListeners(flow, execution) && WORKERTASKEXECUTIONS_WATCHER.containsKey(execution.getId())) {
                WorkerTaskExecution workerTaskExecution2 = WORKERTASKEXECUTIONS_WATCHER.get(execution.getId());
                Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
                WorkerTaskResult workerTaskResult = workerTaskExecution2.getTask().createWorkerTaskResult(this.runContextFactory, workerTaskExecution2, workerTaskFlow, execution);
                this.workerTaskResultQueue.emit((Object)workerTaskResult);
                WORKERTASKEXECUTIONS_WATCHER.remove(execution.getId());
            }
        }
    }

    private void handleFailedExecutionFromExecutor(Executor executor, Exception e) {
        Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
        try {
            failedExecutionWithLog.getLogs().forEach(arg_0 -> this.logQueue.emit(arg_0));
            this.toExecution(executor.withExecution(failedExecutionWithLog.getExecution(), "exception"));
        }
        catch (Exception ex) {
            log.error("Failed to produce {}", (Object)e.getMessage(), (Object)ex);
        }
    }

    private ExecutionState saveExecution(Execution execution) {
        ExecutionState queued = EXECUTIONS.compute(execution.getId(), (s, executionState) -> {
            if (executionState == null) {
                return new ExecutionState(execution);
            }
            return executionState.from(execution);
        });
        return queued;
    }

    private void toExecution(Executor executor) {
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(false), executor);
        }
        this.executionQueue.emit((Object)executor.getExecution());
        this.handleExecution(this.saveExecution(executor.getExecution()));
        if (this.executorService.canBePurged(executor)) {
            EXECUTIONS.remove(executor.getExecution().getId());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void workerTaskResultQueue(WorkerTaskResult message) {
        MemoryExecutor memoryExecutor = this;
        synchronized (memoryExecutor) {
            if (log.isDebugEnabled()) {
                this.executorService.log(log, Boolean.valueOf(true), message);
            }
            if (message.getTaskRun().getState().isTerninated()) {
                this.metricRegistry.counter("executor.taskrun.ended.count", this.metricRegistry.tags(message, new String[0])).increment();
                this.metricRegistry.timer("executor.taskrun.ended.duration", this.metricRegistry.tags(message, new String[0])).record(message.getTaskRun().getState().getDuration());
            }
            EXECUTIONS.compute(message.getTaskRun().getExecutionId(), (s, executionState) -> {
                if (executionState == null) {
                    throw new IllegalStateException("Execution state don't exist for " + s + ", receive " + message);
                }
                if (executionState.execution.hasTaskRunJoinable(message.getTaskRun())) {
                    try {
                        return executionState.from(message, this.executorService, this.flowRepository);
                    }
                    catch (InternalException e) {
                        return new ExecutionState((ExecutionState)executionState, executionState.execution.failedExecutionFromExecutor((Exception)((Object)e)).getExecution());
                    }
                }
                return executionState;
            });
            Flow flow = this.flowRepository.findByExecution(MemoryExecutor.EXECUTIONS.get((Object)message.getTaskRun().getExecutionId()).execution);
            flow = this.transform(flow, MemoryExecutor.EXECUTIONS.get((Object)message.getTaskRun().getExecutionId()).execution);
            this.toExecution(new Executor(MemoryExecutor.EXECUTIONS.get((Object)message.getTaskRun().getExecutionId()).execution, null).withFlow(flow));
        }
    }

    private boolean deduplicateWorkerTask(Execution execution, TaskRun taskRun) {
        ExecutionState executionState = EXECUTIONS.get(execution.getId());
        String deduplicationKey = taskRun.getExecutionId() + "-" + taskRun.getId();
        State.Type current = executionState.workerTaskDeduplication.get(deduplicationKey);
        if (current == taskRun.getState().getCurrent()) {
            log.trace("Duplicate WorkerTask on execution '{}' for taskRun '{}', value '{}, taskId '{}'", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId()});
            return false;
        }
        executionState.workerTaskDeduplication.put(deduplicationKey, taskRun.getState().getCurrent());
        return true;
    }

    private boolean deduplicateNexts(Execution execution, List<TaskRun> taskRuns) {
        ExecutionState executionState = EXECUTIONS.get(execution.getId());
        return taskRuns.stream().anyMatch(taskRun -> {
            String deduplicationKey = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue();
            if (executionState.childDeduplication.containsKey(deduplicationKey)) {
                log.trace("Duplicate Nexts on execution '{}' with key '{}'", (Object)execution.getId(), (Object)deduplicationKey);
                return false;
            }
            executionState.childDeduplication.put(deduplicationKey, taskRun.getId());
            return true;
        });
    }

    public void close() throws IOException {
        this.executionQueue.close();
        this.workerTaskQueue.close();
        this.workerTaskResultQueue.close();
        this.logQueue.close();
    }

    private static class ExecutionState {
        private final Execution execution;
        private Map<String, TaskRun> taskRuns = new ConcurrentHashMap<String, TaskRun>();
        private Map<String, State.Type> workerTaskDeduplication = new ConcurrentHashMap<String, State.Type>();
        private Map<String, String> childDeduplication = new ConcurrentHashMap<String, String>();

        public ExecutionState(Execution execution) {
            this.execution = execution;
        }

        public ExecutionState(ExecutionState executionState, Execution execution) {
            this.execution = execution;
            this.taskRuns = executionState.taskRuns;
            this.workerTaskDeduplication = executionState.workerTaskDeduplication;
            this.childDeduplication = executionState.childDeduplication;
        }

        private static String taskRunKey(TaskRun taskRun) {
            return taskRun.getId() + "-" + (taskRun.getValue() == null ? "null" : taskRun.getValue());
        }

        public ExecutionState from(Execution execution) {
            List taskRuns = execution.getTaskRunList().stream().map(taskRun -> {
                if (!this.taskRuns.containsKey(ExecutionState.taskRunKey(taskRun))) {
                    return taskRun;
                }
                TaskRun stateTaskRun = this.taskRuns.get(ExecutionState.taskRunKey(taskRun));
                if (execution.hasTaskRunJoinable(stateTaskRun)) {
                    return stateTaskRun;
                }
                return taskRun;
            }).collect(Collectors.toList());
            Execution newExecution = execution.withTaskRunList(taskRuns);
            return new ExecutionState(this, newExecution);
        }

        public ExecutionState from(WorkerTaskResult workerTaskResult, ExecutorService executorService, FlowRepositoryInterface flowRepository) throws InternalException {
            this.taskRuns.compute(ExecutionState.taskRunKey(workerTaskResult.getTaskRun()), (key, taskRun) -> workerTaskResult.getTaskRun());
            Execution execution = executorService.addDynamicTaskRun(this.execution, flowRepository.findByExecution(this.execution), workerTaskResult);
            if (execution != null) {
                return new ExecutionState(this, execution);
            }
            return this;
        }
    }
}

