/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.runners;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.Concurrency;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.ExecutionDelay;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.services.ConditionService;
import io.kestra.core.tasks.flows.Pause;
import io.kestra.core.tasks.flows.WorkingDirectory;
import io.kestra.core.utils.Rethrow;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class ExecutorService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ExecutorService.class);
    @Inject
    protected ApplicationContext applicationContext;
    @Inject
    protected RunContextFactory runContextFactory;
    @Inject
    protected MetricRegistry metricRegistry;
    @Inject
    protected ConditionService conditionService;
    protected FlowExecutorInterface flowExecutorInterface;

    protected FlowExecutorInterface flowExecutorInterface() {
        if (this.flowExecutorInterface == null) {
            this.flowExecutorInterface = (FlowExecutorInterface)this.applicationContext.getBean(FlowExecutorInterface.class);
        }
        return this.flowExecutorInterface;
    }

    public Executor checkConcurrencyLimit(Executor executor, Flow flow, Execution execution, long count) {
        if (count >= (long)flow.getConcurrency().getLimit().intValue()) {
            return switch (flow.getConcurrency().getBehavior()) {
                default -> throw new IncompatibleClassChangeError();
                case Concurrency.Behavior.QUEUE -> {
                    Execution newExecution = execution.withState(State.Type.QUEUED);
                    ExecutionQueued executionQueued = ExecutionQueued.builder().tenantId(flow.getTenantId()).namespace(flow.getNamespace()).flowId(flow.getId()).date(Instant.now()).execution(newExecution).build();
                    flow.logger().info("[namespace: {}] [flow: {}] [execution: {}] Flow is queued due to concurrency limit exceeded, {} running(s)", new Object[]{newExecution.getNamespace(), newExecution.getFlowId(), newExecution.getId(), count});
                    yield executor.withExecutionQueued(executionQueued).withExecution(newExecution, "checkConcurrencyLimit");
                }
                case Concurrency.Behavior.CANCEL -> executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
                case Concurrency.Behavior.FAIL -> executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
            };
        }
        return executor;
    }

    public Executor process(Executor executor) {
        if (!executor.canBeProcessed().booleanValue() || this.conditionService.isTerminatedWithListeners(executor.getFlow(), executor.getExecution())) {
            return executor;
        }
        try {
            executor = this.handleRestart(executor);
            executor = this.handleEnd(executor);
            executor = this.handleCreatedKilling(executor);
            executor = this.handleKilling(executor);
            if (executor.getExecution().getState().getCurrent() != State.Type.KILLING && executor.getExecution().getState().getCurrent() != State.Type.KILLED && executor.getExecution().getState().getCurrent() != State.Type.QUEUED) {
                executor = this.handleNext(executor);
                executor = this.handleChildNext(executor);
            }
            executor = this.handleListeners(executor);
            executor = this.handleWorkerTask(executor);
            executor = this.handleChildWorkerTaskResult(executor);
            executor = this.handleExecutableTask(executor);
        }
        catch (Exception e) {
            return executor.withException(e, "process");
        }
        return executor;
    }

    public Execution onNexts(Flow flow, Execution execution, List<TaskRun> nexts) {
        List<TaskRun> executionTasksRun;
        if (log.isTraceEnabled()) {
            log.trace("[namespace: {}] [flow: {}] [execution: {}] Found {} next(s) {}", new Object[]{execution.getNamespace(), execution.getFlowId(), execution.getId(), nexts.size(), nexts});
        }
        if (execution.getTaskRunList() == null) {
            executionTasksRun = nexts;
        } else {
            executionTasksRun = new ArrayList<TaskRun>(execution.getTaskRunList());
            executionTasksRun.addAll(nexts);
        }
        Execution newExecution = execution.withTaskRunList(executionTasksRun);
        if (execution.getState().getCurrent() == State.Type.CREATED) {
            this.metricRegistry.counter("executor.execution.started.count", this.metricRegistry.tags(execution)).increment();
            flow.logger().info("[namespace: {}] [flow: {}] [execution: {}] Flow started", new Object[]{execution.getNamespace(), execution.getFlowId(), execution.getId()});
            newExecution = newExecution.withState(State.Type.RUNNING);
        }
        this.metricRegistry.counter("executor.taskrun.next.count", this.metricRegistry.tags(execution)).increment((double)nexts.size());
        return newExecution;
    }

    private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution execution, TaskRun parentTaskRun) throws InternalException {
        Task parent = flow.findTaskByTaskId(parentTaskRun.getTaskId());
        if (parent instanceof FlowableTask) {
            Optional<State.Type> state;
            FlowableTask flowableParent = (FlowableTask)((Object)parent);
            RunContext runContext = this.runContextFactory.of(flow, parent, execution, parentTaskRun);
            try {
                state = flowableParent.resolveState(runContext, execution, parentTaskRun);
            }
            catch (Exception e) {
                runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), (Throwable)e);
                state = Optional.of(State.Type.FAILED);
            }
            Optional<WorkerTaskResult> endedTask = this.childWorkerTaskTypeToWorkerTask(state, parentTaskRun);
            if (endedTask.isPresent()) {
                return endedTask;
            }
            if (execution.getState().getCurrent() == State.Type.KILLING) {
                if (parentTaskRun.getState().getCurrent() != State.Type.KILLING) {
                    return this.childWorkerTaskTypeToWorkerTask(Optional.of(State.Type.KILLING), parentTaskRun);
                }
                List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(flowableParent.childTasks(runContext, parentTaskRun), FlowableUtils.resolveTasks(flowableParent.getErrors(), parentTaskRun));
                List<TaskRun> taskRunByTasks = execution.findTaskRunByTasks(currentTasks, parentTaskRun);
                if (taskRunByTasks.stream().filter(t -> t.getState().isTerminated()).count() == (long)taskRunByTasks.size()) {
                    return this.childWorkerTaskTypeToWorkerTask(Optional.of(State.Type.KILLED), parentTaskRun);
                }
            }
        }
        return Optional.empty();
    }

    private Optional<WorkerTaskResult> childWorkerTaskTypeToWorkerTask(Optional<State.Type> findState, TaskRun taskRun) {
        return findState.map(Rethrow.throwFunction(type -> new WorkerTaskResult(taskRun.withState((State.Type)((Object)type))))).stream().peek(workerTaskResult -> this.metricRegistry.counter("executor.workertaskresult.count", this.metricRegistry.tags((WorkerTaskResult)workerTaskResult, new String[0])).increment()).findFirst();
    }

    private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun) throws InternalException {
        Task parent = executor.getFlow().findTaskByTaskId(parentTaskRun.getTaskId());
        if (parent instanceof FlowableTask) {
            FlowableTask flowableParent = (FlowableTask)((Object)parent);
            try {
                List<NextTaskRun> nexts = flowableParent.resolveNexts(this.runContextFactory.of(executor.getFlow(), parent, executor.getExecution(), parentTaskRun), executor.getExecution(), parentTaskRun);
                if (nexts.size() > 0) {
                    return this.saveFlowableOutput(nexts, executor, parentTaskRun);
                }
            }
            catch (Exception e) {
                log.warn("Unable to resolve the next tasks to run", (Throwable)e);
            }
        }
        return Collections.emptyList();
    }

    private List<TaskRun> saveFlowableOutput(List<NextTaskRun> nextTaskRuns, Executor executor, TaskRun parentTaskRun) {
        return nextTaskRuns.stream().map(Rethrow.throwFunction(t -> {
            TaskRun taskRun = t.getTaskRun();
            if (!(t.getTask() instanceof FlowableTask)) {
                return taskRun;
            }
            FlowableTask flowableTask = (FlowableTask)((Object)t.getTask());
            try {
                RunContext runContext = this.runContextFactory.of(executor.getFlow(), t.getTask(), executor.getExecution(), t.getTaskRun());
                Object outputs = flowableTask.outputs(runContext, executor.getExecution(), parentTaskRun);
                taskRun = taskRun.withOutputs(outputs != null ? outputs.toMap() : ImmutableMap.of());
            }
            catch (Exception e) {
                executor.getFlow().logger().warn("Unable to save output on taskRun '{}'", (Object)taskRun, (Object)e);
            }
            return taskRun;
        })).collect(Collectors.toList());
    }

    private Executor onEnd(Executor executor) {
        Execution newExecution = executor.getExecution().withState(executor.getExecution().guessFinalState(executor.getFlow()));
        Logger logger = executor.getFlow().logger();
        logger.info("[namespace: {}] [flow: {}] [execution: {}] Flow completed with state {} in {}", new Object[]{newExecution.getNamespace(), newExecution.getFlowId(), newExecution.getId(), newExecution.getState().getCurrent(), newExecution.getState().humanDuration()});
        if (logger.isTraceEnabled()) {
            logger.debug(newExecution.toString(true));
        }
        this.metricRegistry.counter("executor.execution.end.count", this.metricRegistry.tags(newExecution)).increment();
        this.metricRegistry.timer("executor.execution.duration", this.metricRegistry.tags(newExecution)).record(newExecution.getState().getDuration());
        return executor.withExecution(newExecution, "onEnd");
    }

    private Executor handleNext(Executor executor) {
        List<NextTaskRun> nextTaskRuns = FlowableUtils.resolveSequentialNexts(executor.getExecution(), ResolvedTask.of(executor.getFlow().getTasks()), ResolvedTask.of(executor.getFlow().getErrors()));
        if (nextTaskRuns.isEmpty()) {
            return executor;
        }
        return executor.withTaskRun(this.saveFlowableOutput(nextTaskRuns, executor, null), "handleNext");
    }

    private Executor handleChildNext(Executor executor) throws InternalException {
        if (executor.getExecution().getTaskRunList() == null) {
            return executor;
        }
        List<TaskRun> running = executor.getExecution().getTaskRunList().stream().filter(taskRun -> taskRun.getState().isRunning()).toList();
        ArrayList<TaskRun> result = new ArrayList<TaskRun>();
        for (TaskRun taskRun2 : running) {
            result.addAll(this.childNextsTaskRun(executor, taskRun2));
        }
        if (result.size() == 0) {
            return executor;
        }
        return executor.withTaskRun(result, "handleChildNext");
    }

    private Executor handleChildWorkerTaskResult(Executor executor) throws InternalException {
        if (executor.getExecution().getTaskRunList() == null) {
            return executor;
        }
        ArrayList<WorkerTaskResult> list = new ArrayList<WorkerTaskResult>();
        for (TaskRun taskRun : executor.getExecution().getTaskRunList()) {
            if (!taskRun.getState().isRunning()) continue;
            Optional<WorkerTaskResult> workerTaskResult = this.childWorkerTaskResult(executor.getFlow(), executor.getExecution(), taskRun);
            workerTaskResult.ifPresent(list::add);
        }
        if (list.size() == 0) {
            return executor;
        }
        executor = this.handlePausedDelay(executor, list);
        return executor.withWorkerTaskResults(list, "handleChildWorkerTaskResult");
    }

    private Executor handlePausedDelay(Executor executor, List<WorkerTaskResult> workerTaskResults) throws InternalException {
        if (workerTaskResults.stream().noneMatch(workerTaskResult -> workerTaskResult.getTaskRun().getState().getCurrent() == State.Type.PAUSED)) {
            return executor;
        }
        List<ExecutionDelay> list = workerTaskResults.stream().filter(workerTaskResult -> workerTaskResult.getTaskRun().getState().getCurrent() == State.Type.PAUSED).map(Rethrow.throwFunction(workerTaskResult -> {
            Pause pauseTask;
            Task task = executor.getFlow().findTaskByTaskId(workerTaskResult.getTaskRun().getTaskId());
            if (task instanceof Pause && ((pauseTask = (Pause)task).getDelay() != null || pauseTask.getTimeout() != null)) {
                return ExecutionDelay.builder().taskRunId(workerTaskResult.getTaskRun().getId()).executionId(executor.getExecution().getId()).date(workerTaskResult.getTaskRun().getState().maxDate().plus(pauseTask.getDelay() != null ? pauseTask.getDelay() : pauseTask.getTimeout())).state(pauseTask.getDelay() != null ? State.Type.RUNNING : State.Type.FAILED).build();
            }
            return null;
        })).filter(Objects::nonNull).collect(Collectors.toList());
        if (executor.getExecution().getState().getCurrent() != State.Type.PAUSED) {
            return executor.withExecution(executor.getExecution().withState(State.Type.PAUSED), "handlePausedDelay").withWorkerTaskDelays(list, "handlePausedDelay");
        }
        return executor.withWorkerTaskDelays(list, "handlePausedDelay");
    }

    private Executor handleCreatedKilling(Executor executor) throws InternalException {
        if (executor.getExecution().getTaskRunList() == null || executor.getExecution().getState().getCurrent() != State.Type.KILLING) {
            return executor;
        }
        List<WorkerTaskResult> workerTaskResults = executor.getExecution().getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent().isCreated()).map(Rethrow.throwFunction(t -> {
            Task task = executor.getFlow().findTaskByTaskId(t.getTaskId());
            return this.childWorkerTaskTypeToWorkerTask(Optional.of(State.Type.KILLED), (TaskRun)t);
        })).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
        return executor.withWorkerTaskResults(workerTaskResults, "handleChildWorkerCreatedKilling");
    }

    private Executor handleListeners(Executor executor) {
        if (!executor.getExecution().getState().isTerminated()) {
            return executor;
        }
        List<ResolvedTask> currentTasks = this.conditionService.findValidListeners(executor.getFlow(), executor.getExecution());
        List<TaskRun> nexts = this.saveFlowableOutput(FlowableUtils.resolveSequentialNexts(executor.getExecution(), currentTasks), executor, null);
        if (nexts.size() == 0) {
            return executor;
        }
        return executor.withTaskRun(nexts, "handleListeners");
    }

    private Executor handleEnd(Executor executor) {
        if (executor.getExecution().getState().isTerminated() || executor.getExecution().getState().isPaused()) {
            return executor;
        }
        List<ResolvedTask> currentTasks = executor.getExecution().findTaskDependingFlowState(ResolvedTask.of(executor.getFlow().getTasks()), ResolvedTask.of(executor.getFlow().getErrors()));
        if (!executor.getExecution().isTerminated(currentTasks)) {
            return executor;
        }
        return this.onEnd(executor);
    }

    private Executor handleRestart(Executor executor) {
        if (executor.getExecution().getState().getCurrent() != State.Type.RESTARTED) {
            return executor;
        }
        this.metricRegistry.counter("executor.execution.started.count", this.metricRegistry.tags(executor.getExecution())).increment();
        executor.getFlow().logger().info("[namespace: {}] [flow: {}] [execution: {}] Flow restarted", new Object[]{executor.getExecution().getNamespace(), executor.getExecution().getFlowId(), executor.getExecution().getId()});
        return executor.withExecution(executor.getExecution().withState(State.Type.RUNNING), "handleRestart");
    }

    private Executor handleKilling(Executor executor) {
        if (executor.getExecution().getState().getCurrent() != State.Type.KILLING) {
            return executor;
        }
        List<ResolvedTask> currentTasks = executor.getExecution().findTaskDependingFlowState(ResolvedTask.of(executor.getFlow().getTasks()), ResolvedTask.of(executor.getFlow().getErrors()));
        if (executor.getExecution().hasRunning(currentTasks) || executor.getExecution().hasCreated()) {
            return executor;
        }
        Execution newExecution = executor.getExecution().withState(State.Type.KILLED);
        return executor.withExecution(newExecution, "handleKilling");
    }

    private Executor handleWorkerTask(Executor executor) throws InternalException {
        if (executor.getExecution().getTaskRunList() == null || executor.getExecution().getState().getCurrent() == State.Type.KILLING) {
            return executor;
        }
        List<WorkerTask> workerTasks = executor.getExecution().getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent().isCreated()).map(Rethrow.throwFunction(taskRun -> {
            Task task = executor.getFlow().findTaskByTaskId(taskRun.getTaskId());
            RunContext runContext = this.runContextFactory.of(executor.getFlow(), task, executor.getExecution(), (TaskRun)taskRun);
            return WorkerTask.builder().runContext(runContext).taskRun((TaskRun)taskRun).task(task).build();
        })).collect(Collectors.toList());
        if (workerTasks.isEmpty()) {
            return executor;
        }
        return executor.withWorkerTasks(workerTasks, "handleWorkerTask");
    }

    private Executor handleExecutableTask(Executor executor) {
        ArrayList executions = new ArrayList();
        ArrayList<WorkerTaskResult> workerTaskResults = new ArrayList<WorkerTaskResult>();
        boolean haveFlows = executor.getWorkerTasks().removeIf(workerTask -> {
            if (!(workerTask.getTask() instanceof ExecutableTask)) {
                return false;
            }
            Task executableTask = (Task)((Object)((ExecutableTask)((Object)workerTask.getTask())));
            try {
                TaskRun executableTaskRun = executor.getExecution().findTaskRunByTaskRunId(workerTask.getTaskRun().getId());
                executor.withExecution(executor.getExecution().withTaskRun(executableTaskRun.withState(State.Type.RUNNING)), "handleExecutableTaskRunning");
                RunContext runContext = this.runContextFactory.of(executor.getFlow(), executableTask, executor.getExecution(), executableTaskRun);
                List<WorkerTaskExecution<?>> workerTaskExecutions = ((ExecutableTask)((Object)executableTask)).createWorkerTaskExecutions(runContext, this.flowExecutorInterface(), executor.getFlow(), executor.getExecution(), executableTaskRun);
                if (workerTaskExecutions.isEmpty()) {
                    executor.withExecution(executor.getExecution().withTaskRun(executableTaskRun.withState(State.Type.SUCCESS)), "handleExecutableTaskRunning.noExecution");
                } else {
                    executions.addAll(workerTaskExecutions);
                    if (!((ExecutableTask)((Object)executableTask)).waitForExecution()) {
                        for (WorkerTaskExecution<?> workerTaskExecution : workerTaskExecutions) {
                            Optional<WorkerTaskResult> workerTaskResult = ((ExecutableTask)((Object)executableTask)).createWorkerTaskResult(runContext, workerTaskExecution.getTaskRun().withState(State.Type.SUCCESS), executor.getFlow(), workerTaskExecution.getExecution());
                            workerTaskResult.ifPresent(result -> workerTaskResults.add((WorkerTaskResult)result));
                        }
                    }
                }
            }
            catch (Exception e) {
                workerTaskResults.add(WorkerTaskResult.builder().taskRun(workerTask.getTaskRun().withState(State.Type.FAILED).withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build()))).build());
                executor.withException(e, "handleExecutableTask");
            }
            return true;
        });
        if (!haveFlows) {
            return executor;
        }
        Executor resultExecutor = executor.withWorkerTaskExecutions(executions, "handleExecutableTask");
        if (!workerTaskResults.isEmpty()) {
            resultExecutor = executor.withWorkerTaskResults(workerTaskResults, "handleExecutableTaskWorkerTaskResults");
        }
        return resultExecutor;
    }

    public Execution addDynamicTaskRun(Execution execution, Flow flow, WorkerTaskResult workerTaskResult) throws InternalException {
        ArrayList<TaskRun> taskRuns;
        block4: {
            taskRuns = new ArrayList<TaskRun>(execution.getTaskRunList());
            if (workerTaskResult.getDynamicTaskRuns() != null) {
                taskRuns.addAll(workerTaskResult.getDynamicTaskRuns());
            }
            if (workerTaskResult.getTaskRun().getParentTaskRunId() != null) {
                try {
                    execution.findTaskRunByTaskRunId(workerTaskResult.getTaskRun().getId());
                }
                catch (InternalException e) {
                    TaskRun parentTaskRun = execution.findTaskRunByTaskRunId(workerTaskResult.getTaskRun().getParentTaskRunId());
                    Task parentTask = flow.findTaskByTaskId(parentTaskRun.getTaskId());
                    if (!(parentTask instanceof WorkingDirectory)) break block4;
                    taskRuns.add(workerTaskResult.getTaskRun());
                }
            }
        }
        return taskRuns.size() > execution.getTaskRunList().size() ? execution.withTaskRunList(taskRuns) : null;
    }

    public boolean canBePurged(Executor executor) {
        return executor.getExecution().isDeleted() || executor.getFlow() != null && this.conditionService.isTerminatedWithListeners(executor.getFlow(), executor.getExecution()) && executor.getExecution().getState().getCurrent() != State.Type.PAUSED && executor.getExecution().getState().getCurrent() != State.Type.KILLED;
    }

    public void log(Logger log, Boolean in, WorkerJob value) {
        if (value instanceof WorkerTask) {
            WorkerTask workerTask = (WorkerTask)value;
            log.debug("{} {} : {}", new Object[]{in != false ? "<< IN " : ">> OUT", workerTask.getClass().getSimpleName(), workerTask.getTaskRun().toStringState()});
        } else if (value instanceof WorkerTrigger) {
            WorkerTrigger workerTrigger = (WorkerTrigger)value;
            log.debug("{} {} : {}", new Object[]{in != false ? "<< IN " : ">> OUT", workerTrigger.getClass().getSimpleName(), workerTrigger.getTriggerContext().uid()});
        }
    }

    public void log(Logger log, Boolean in, WorkerTaskResult value) {
        log.debug("{} {} : {}", new Object[]{in != false ? "<< IN " : ">> OUT", value.getClass().getSimpleName(), value.getTaskRun().toStringState()});
    }

    public void log(Logger log, Boolean in, Execution value) {
        log.debug("{} {} [key='{}']\n{}", new Object[]{in != false ? "<< IN " : ">> OUT", value.getClass().getSimpleName(), value.getId(), value.toStringState()});
    }

    public void log(Logger log, Boolean in, Executor value) {
        log.debug("{} {} [key='{}', from='{}', offset='{}', crc32='{}']\n{}", new Object[]{in != false ? "<< IN " : ">> OUT", value.getClass().getSimpleName(), value.getExecution().getId(), value.getFrom(), value.getOffset(), value.getExecution().toCrc32State(), value.getExecution().toStringState()});
    }
}

