/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.services;

import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.hierarchies.AbstractGraphTask;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.services.VariablesService;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Rethrow;
import io.kestra.plugin.core.flow.Pause;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.http.multipart.CompletedPart;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.constraints.NotNull;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Singleton
public class ExecutionService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ExecutionService.class);
    @Inject
    private FlowRepositoryInterface flowRepositoryInterface;
    @Inject
    private StorageInterface storageInterface;
    @Inject
    private ExecutionRepositoryInterface executionRepository;
    @Inject
    private LogRepositoryInterface logRepository;
    @Inject
    private MetricRepositoryInterface metricRepository;
    @Inject
    private FlowInputOutput flowInputOutput;
    @Inject
    private ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
    @Inject
    private ConcurrencyLimitService concurrencyLimitService;
    @Inject
    private ConditionService conditionService;
    @Inject
    private RunContextFactory runContextFactory;
    @Inject
    private PluginDefaultService pluginDefaultService;
    @Inject
    private VariablesService variablesService;

    public Execution getExecutionIfPause(String tenant, @NotNull String executionId, boolean withACL) {
        Execution execution = this.getExecution(tenant, executionId, withACL);
        if (!execution.getState().isPaused()) {
            throw new IllegalStateException("Execution '" + executionId + "' is not paused, can't resume it");
        }
        return execution;
    }

    public Execution getExecution(String tenant, @NotNull String executionId, boolean withACL) {
        Optional<Execution> maybeExecution = withACL ? this.executionRepository.findById(tenant, executionId) : this.executionRepository.findByIdWithoutAcl(tenant, executionId);
        return maybeExecution.orElseThrow(() -> new NoSuchElementException("Execution '" + executionId + "' not found."));
    }

    public Execution retryTask(Execution execution, String taskRunId) {
        List<TaskRun> newTaskRuns = execution.getTaskRunList().stream().map(taskRun -> {
            if (taskRun.getId().equals(taskRunId)) {
                return taskRun.withState(State.Type.CREATED);
            }
            return taskRun;
        }).toList();
        return execution.withTaskRunList(newTaskRuns).withState(State.Type.RUNNING);
    }

    public Execution retryWaitFor(Execution execution, String flowableTaskRunId) {
        AtomicReference<Boolean> firstDone = new AtomicReference<Boolean>(false);
        List<TaskRun> newTaskRuns = execution.getTaskRunList().stream().map(taskRun -> {
            if (taskRun.getId().equals(flowableTaskRunId)) {
                return taskRun.resetAttempts().incrementIteration();
            }
            if (flowableTaskRunId.equals(taskRun.getParentTaskRunId())) {
                return null;
            }
            return taskRun;
        }).filter(Objects::nonNull).toList();
        return execution.withTaskRunList(newTaskRuns).withState(State.Type.RUNNING);
    }

    public Execution pauseFlowable(Execution execution, TaskRun updateFlowableTaskRun) throws InternalException {
        return execution.withTaskRun(updateFlowableTaskRun.withState(State.Type.PAUSED)).withState(State.Type.PAUSED);
    }

    public Execution restart(Execution execution, @Nullable Integer revision) throws Exception {
        if (!execution.getState().isTerminated() && !execution.getState().isPaused()) {
            throw new IllegalStateException("Execution must be terminated to be restarted, current state is '" + String.valueOf((Object)execution.getState().getCurrent()) + "' !");
        }
        FlowWithSource flow = this.flowRepositoryInterface.findByExecutionWithoutAcl(execution);
        Set<String> taskRunToRestart = this.taskRunToRestart(execution, taskRun -> taskRun.getState().getCurrent().isFailed() || taskRun.getState().getCurrent().isPaused());
        Map<String, String> mappingTaskRunId = this.mapTaskRunId(execution, revision == null);
        String newExecutionId = revision != null ? IdUtils.create() : null;
        List newTaskRuns = execution.getTaskRunList().stream().map(Rethrow.throwFunction(originalTaskRun -> this.mapTaskRun(flow, (TaskRun)originalTaskRun, mappingTaskRunId, newExecutionId, State.Type.RESTARTED, taskRunToRestart.contains(originalTaskRun.getId())))).collect(Collectors.toCollection(ArrayList::new));
        this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId).forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));
        flow.allErrorsWithChildren().forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
        flow.allFinallyWithChildren().forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
        ListUtils.emptyOnNull(flow.getAfterExecution()).forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
        Execution newExecution = execution.childExecution(newExecutionId, newTaskRuns, execution.withState(State.Type.RESTARTED).getState());
        ArrayList<Label> newLabels = new ArrayList<Label>(ListUtils.emptyOnNull(execution.getLabels()));
        if (!newLabels.contains(new Label("system.restarted", "true"))) {
            newLabels.add(new Label("system.restarted", "true"));
        }
        newExecution = newExecution.withMetadata(execution.getMetadata().nextAttempt()).withLabels(newLabels);
        return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
    }

    private Set<String> taskRunToRestart(Execution execution, Predicate<TaskRun> predicate) {
        Set<String> finalTaskRunToRestart = this.taskRunWithAncestors(execution, execution.getTaskRunList().stream().filter(predicate).toList());
        if (finalTaskRunToRestart.isEmpty()) {
            throw new IllegalArgumentException("No task found to restart execution from!");
        }
        return finalTaskRunToRestart;
    }

    public Execution replay(Execution execution, @Nullable String taskRunId, @Nullable Integer revision) throws Exception {
        String newExecutionId = IdUtils.create();
        ArrayList<TaskRun> newTaskRuns = new ArrayList<TaskRun>();
        if (taskRunId != null) {
            FlowWithSource flow = this.flowRepositoryInterface.findByExecutionWithoutAcl(execution);
            GraphCluster graphCluster = GraphUtils.of(flow, execution);
            Set<String> taskRunToRestart = this.taskRunToRestart(execution, taskRun -> taskRun.getId().equals(taskRunId));
            Map<String, String> mappingTaskRunId = this.mapTaskRunId(execution, false);
            newTaskRuns.addAll(execution.getTaskRunList().stream().map(Rethrow.throwFunction(originalTaskRun -> this.mapTaskRun(flow, (TaskRun)originalTaskRun, mappingTaskRunId, newExecutionId, State.Type.RESTARTED, taskRunToRestart.contains(originalTaskRun.getId())))).toList());
            Set<String> taskRunToRemove = GraphUtils.successors(graphCluster, List.of(taskRunId)).stream().filter(task -> task instanceof AbstractGraphTask).map(task -> (AbstractGraphTask)task).filter(task -> task.getTaskRun() != null).filter(task -> !task.getTaskRun().getId().equals(taskRunId)).filter(task -> !taskRunToRestart.contains(task.getTaskRun().getId())).map(s -> (String)mappingTaskRunId.get(s.getTaskRun().getId())).collect(Collectors.toSet());
            taskRunToRemove.forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));
            this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId).forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));
        }
        Execution newExecution = execution.childExecution(newExecutionId, newTaskRuns, taskRunId == null ? new State() : execution.withState(State.Type.RESTARTED).getState());
        ArrayList<Label> newLabels = new ArrayList<Label>(ListUtils.emptyOnNull(execution.getLabels()));
        if (!newLabels.contains(new Label("system.replay", "true"))) {
            newLabels.add(new Label("system.replay", "true"));
        }
        newExecution = newExecution.withMetadata(execution.getMetadata().nextAttempt()).withLabels(newLabels);
        return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
    }

    public Execution changeTaskRunState(Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
        Execution newExecution = this.markAs(execution, flow, taskRunId, newState);
        if (execution.getState().isTerminated()) {
            List<TaskRun> newTaskRuns = newExecution.getTaskRunList();
            flow.allErrorsWithChildren().forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
            flow.allFinallyWithChildren().forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
            ListUtils.emptyOnNull(flow.getAfterExecution()).forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
            return newExecution.withTaskRunList(newTaskRuns);
        }
        return newExecution;
    }

    public Execution markAs(Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
        return this.markAs(execution, flow, taskRunId, newState, null, null);
    }

    private Execution markAs(Execution execution, FlowInterface flow, String taskRunId, State.Type newState, @Nullable Map<String, Object> onResumeInputs, @Nullable Pause.Resumed resumed) throws Exception {
        Set<String> taskRunToRestart = this.taskRunToRestart(execution, taskRun -> taskRun.getId().equals(taskRunId));
        Execution newExecution = execution.withMetadata(execution.getMetadata().nextAttempt());
        FlowWithSource flowWithSource = this.pluginDefaultService.injectVersionDefaults(flow, false);
        for (String s : taskRunToRestart) {
            TaskRun originalTaskRun = newExecution.findTaskRunByTaskRunId(s);
            Task task = flowWithSource.findTaskByTaskId(originalTaskRun.getTaskId());
            boolean isFlowable = task.isFlowable();
            if (!isFlowable || s.equals(taskRunId)) {
                TaskRun newTaskRun;
                if (task instanceof Pause) {
                    Pause pauseTask = (Pause)task;
                    State.Type terminalState = newState == State.Type.RUNNING ? State.Type.SUCCESS : newState;
                    Pause.Resumed _resumed = resumed != null ? resumed : Pause.Resumed.now(terminalState);
                    Variables variables = this.variablesService.of(StorageContext.forTask(originalTaskRun), pauseTask.generateOutputs(onResumeInputs, _resumed));
                    newTaskRun = originalTaskRun.withOutputs(variables);
                    newTaskRun = ListUtils.isEmpty(pauseTask.getTasks()) && ListUtils.isEmpty(pauseTask.getErrors()) && ListUtils.isEmpty(pauseTask.getFinally()) ? (newState == State.Type.RUNNING ? newTaskRun.withState(State.Type.SUCCESS) : (newState == State.Type.KILLING ? newTaskRun.withState(State.Type.KILLED) : newTaskRun.withState(newState))) : newTaskRun.withState(State.Type.RUNNING);
                } else {
                    newTaskRun = originalTaskRun.withState(newState);
                }
                if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {
                    ArrayList<TaskRunAttempt> attempts = new ArrayList<TaskRunAttempt>(originalTaskRun.getAttempts());
                    attempts.set(attempts.size() - 1, attempts.getLast().withState(newState));
                    newTaskRun = newTaskRun.withAttempts(attempts);
                }
                newExecution = newExecution.withTaskRun(newTaskRun);
                continue;
            }
            newExecution = newExecution.withTaskRun(originalTaskRun.withState(State.Type.RUNNING));
        }
        if (newExecution.getTaskRunList().stream().anyMatch(t -> t.getState().getCurrent() == State.Type.PAUSED)) {
            return newExecution;
        }
        return newState == State.Type.CANCELLED ? newExecution.withState(State.Type.CANCELLED) : newExecution.withState(State.Type.RESTARTED);
    }

    public Execution markWithTaskRunAs(Execution execution, String taskRunId, State.Type newState, Boolean markParents) throws Exception {
        TaskRun taskRun = execution.findTaskRunByTaskRunId(taskRunId);
        Execution updatedExecution = execution.withTaskRun(taskRun.withState(newState));
        if (markParents.booleanValue() && taskRun.getParentTaskRunId() != null) {
            return this.markWithTaskRunAs(updatedExecution, taskRun.getParentTaskRunId(), newState, true);
        }
        return updatedExecution.withState(newState);
    }

    public PurgeResult purge(Boolean purgeExecution, Boolean purgeLog, Boolean purgeMetric, Boolean purgeStorage, @Nullable String tenantId, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List<State.Type> state, int batchSize) throws IOException {
        PurgeResult purgeResult = (PurgeResult)this.executionRepository.find(null, tenantId, null, namespace, flowId, startDate, endDate, state, null, null, null, true).buffer(batchSize).map(Rethrow.throwFunction(executions -> {
            PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
            if (purgeExecution.booleanValue()) {
                builder.executionsCount(this.executionRepository.purge((List<Execution>)executions));
            }
            if (purgeLog.booleanValue()) {
                builder.logsCount(this.logRepository.purge((List<Execution>)executions));
            }
            if (purgeMetric.booleanValue()) {
                builder.metricsCount(this.metricRepository.purge((List<Execution>)executions));
            }
            if (purgeStorage.booleanValue()) {
                executions.forEach(Rethrow.throwConsumer(execution -> {
                    URI uri = StorageContext.forExecution(execution).getExecutionStorageURI("kestra");
                    builder.storagesCount(this.storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri).size());
                }));
            }
            return builder.build();
        })).reduce((a, b) -> ((PurgeResult.PurgeResultBuilder)((PurgeResult.PurgeResultBuilder)((PurgeResult.PurgeResultBuilder)((PurgeResult.PurgeResultBuilder)a.toBuilder().executionsCount(a.getExecutionsCount() + b.getExecutionsCount())).logsCount(a.getLogsCount() + b.getLogsCount())).storagesCount(a.getStoragesCount() + b.getStoragesCount())).metricsCount(a.getMetricsCount() + b.getMetricsCount())).build()).block();
        if (purgeResult != null) {
            return purgeResult;
        }
        return PurgeResult.builder().build();
    }

    public void delete(Execution execution, boolean deleteLogs, boolean deleteMetrics, boolean deleteStorage) throws IOException {
        this.executionRepository.purge(execution);
        if (deleteLogs) {
            this.logRepository.purge(execution);
        }
        if (deleteMetrics) {
            this.metricRepository.purge(execution);
        }
        if (deleteStorage) {
            URI uri = StorageContext.forExecution(execution).getExecutionStorageURI("kestra");
            this.storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri);
        }
    }

    public Execution resume(Execution execution, FlowInterface flow, State.Type newState, Pause.Resumed resumed) throws Exception {
        return this.resume(execution, flow, newState, (Map<String, Object>)null, resumed);
    }

    public Mono<List<InputAndValue>> validateForResume(Execution execution, FlowInterface flow) {
        return this.getFirstPausedTaskOr(execution, flow).flatMap(task -> {
            Object patt0$temp;
            if (task.isPresent() && (patt0$temp = task.get()) instanceof Pause) {
                Pause pauseTask = (Pause)patt0$temp;
                return Mono.just(this.flowInputOutput.resolveInputs(pauseTask.getOnResume(), flow, execution, Map.of()));
            }
            return Mono.just(Collections.emptyList());
        });
    }

    public Mono<List<InputAndValue>> validateForResume(Execution execution, Flow flow, @Nullable Publisher<CompletedPart> inputs) {
        return this.getFirstPausedTaskOr(execution, flow).flatMap(task -> {
            Object patt0$temp;
            if (task.isPresent() && (patt0$temp = task.get()) instanceof Pause) {
                Pause pauseTask = (Pause)patt0$temp;
                return this.flowInputOutput.validateExecutionInputs(pauseTask.getOnResume(), flow, execution, inputs);
            }
            return Mono.just(Collections.emptyList());
        });
    }

    public Mono<Execution> resume(Execution execution, FlowInterface flow, State.Type newState, @Nullable Publisher<CompletedPart> inputs, @Nullable Pause.Resumed resumed) {
        return this.getFirstPausedTaskOr(execution, flow).flatMap(task -> {
            Object patt0$temp;
            if (task.isPresent() && (patt0$temp = task.get()) instanceof Pause) {
                Pause pauseTask = (Pause)patt0$temp;
                return this.flowInputOutput.readExecutionInputs(pauseTask.getOnResume(), flow, execution, inputs);
            }
            return Mono.just(Collections.emptyMap());
        }).handle((resumeInputs, sink) -> {
            try {
                sink.next((Object)this.resume(execution, flow, newState, (Map<String, Object>)resumeInputs, resumed));
            }
            catch (Exception e) {
                sink.error((Throwable)e);
            }
        });
    }

    private Mono<Optional<Task>> getFirstPausedTaskOr(Execution execution, FlowInterface flow) {
        return Mono.create(sink -> {
            try {
                FlowWithSource flowWithSource = this.pluginDefaultService.injectVersionDefaults(flow, false);
                Optional<Task> runningTaskRun = execution.findFirstByState(State.Type.PAUSED).map(Rethrow.throwFunction(task -> flowWithSource.findTaskByTaskId(task.getTaskId())));
                sink.success(runningTaskRun);
            }
            catch (FlowProcessingException | InternalException e) {
                sink.error((Throwable)e);
            }
        });
    }

    public Execution resume(Execution execution, FlowInterface flow, State.Type newState, @Nullable Map<String, Object> inputs, @Nullable Pause.Resumed resumed) throws Exception {
        Execution unpausedExecution;
        Optional<TaskRun> pausedTaskRun = execution.findFirstByState(State.Type.PAUSED);
        if (pausedTaskRun.isPresent()) {
            unpausedExecution = this.markAs(execution, flow, pausedTaskRun.get().getId(), newState, inputs, resumed);
        } else {
            if (!execution.getState().isPaused()) {
                throw new IllegalArgumentException("The execution is not paused");
            }
            unpausedExecution = execution.withState(newState);
        }
        this.eventPublisher.publishEvent(new CrudEvent<Execution>(unpausedExecution, execution, CrudEventType.UPDATE));
        return unpausedExecution;
    }

    public Execution pause(Execution execution) throws Exception {
        if (!execution.getState().isRunning()) {
            throw new IllegalArgumentException("The execution is not running");
        }
        Execution pausedExecution = execution.withState(State.Type.PAUSED);
        this.eventPublisher.publishEvent(new CrudEvent<Execution>(pausedExecution, execution, CrudEventType.UPDATE));
        return pausedExecution;
    }

    public Flux<ExecutionKilledExecution> killSubflowExecutions(String tenantId, String executionId) {
        Flux<Execution> executions = this.executionRepository.findAllByTriggerExecutionId(tenantId, executionId);
        return executions.filter(childExecution -> {
            State state = childExecution.getState();
            return state.getCurrent() != State.Type.KILLING && state.getCurrent() != State.Type.KILLED;
        }).map(childExecution -> ((ExecutionKilledExecution.ExecutionKilledExecutionBuilder)((ExecutionKilledExecution.ExecutionKilledExecutionBuilder)((ExecutionKilled.ExecutionKilledBuilder)((ExecutionKilledExecution.ExecutionKilledExecutionBuilder)ExecutionKilledExecution.builder().executionId(childExecution.getId())).isOnKillCascade(true)).state(ExecutionKilled.State.REQUESTED)).tenantId(tenantId)).build());
    }

    public Execution kill(Execution execution, FlowInterface flow, Optional<State.Type> afterKillState) {
        Execution newExecution;
        if (execution.getState().getCurrent() == State.Type.KILLING && afterKillState.isEmpty() || execution.getState().isTerminated()) {
            return execution;
        }
        State.Type killingOrAfterKillState = afterKillState.orElse(State.Type.KILLING);
        if (execution.getState().isPaused()) {
            try {
                newExecution = this.resume(execution, flow, State.Type.KILLING, null);
                newExecution = newExecution.withState(afterKillState.orElse(newExecution.getState().getCurrent()));
            }
            catch (Exception e) {
                log.warn("Unable to resume a paused execution before killing it", (Throwable)e);
                newExecution = execution.withState(killingOrAfterKillState);
            }
        } else {
            newExecution = execution.withState(killingOrAfterKillState);
        }
        return newExecution;
    }

    public Execution kill(Execution execution, FlowInterface flow) {
        return this.kill(execution, flow, Optional.empty());
    }

    public Execution killParentTaskruns(TaskRun taskRun, Execution execution) throws InternalException {
        TaskRun parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
        Execution newExecution = execution;
        if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
            newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED));
        }
        if (parentTaskRun.getParentTaskRunId() != null) {
            return this.killParentTaskruns(parentTaskRun, newExecution);
        }
        return newExecution;
    }

    private Set<String> removeWorkerTask(Flow flow, Execution execution, Set<String> taskRunToRestart, Map<String, String> mappingTaskRunId) throws InternalException {
        Set workerTaskRunId = taskRunToRestart.stream().filter(Rethrow.throwPredicate(s -> {
            TaskRun taskRun = execution.findTaskRunByTaskRunId((String)s);
            Task task = flow.findTaskByTaskId(taskRun.getTaskId());
            return task instanceof WorkingDirectory;
        })).collect(Collectors.toSet());
        GraphCluster graphCluster = GraphUtils.of(flow, execution);
        return GraphUtils.successors(graphCluster, new ArrayList<String>(workerTaskRunId)).stream().filter(task -> task instanceof AbstractGraphTask).map(task -> (AbstractGraphTask)task).filter(task -> task.getTaskRun() != null).filter(s -> !workerTaskRunId.contains(s.getTaskRun().getId())).map(s -> (String)mappingTaskRunId.get(s.getTaskRun().getId())).collect(Collectors.toSet());
    }

    private Set<String> getAncestors(Execution execution, TaskRun taskRun) {
        return Stream.concat(execution.findParents(taskRun).stream(), Stream.of(taskRun)).map(TaskRun::getId).collect(Collectors.toSet());
    }

    private Map<String, String> mapTaskRunId(Execution execution, boolean keep) {
        return execution.getTaskRunList().stream().map(t -> new AbstractMap.SimpleEntry<String, String>(t.getId(), keep ? t.getId() : IdUtils.create())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private TaskRun mapTaskRun(Flow flow, TaskRun originalTaskRun, Map<String, String> mappingTaskRunId, String newExecutionId, State.Type newStateType, Boolean toRestart) throws InternalException {
        Task task;
        State alterState = Boolean.TRUE.equals(originalTaskRun.getDynamic()) ? originalTaskRun.withState(newStateType).getState() : (!(task = flow.findTaskByTaskId(originalTaskRun.getTaskId())).isFlowable() || task instanceof WorkingDirectory ? originalTaskRun.withState(newStateType).getState() : originalTaskRun.withState(State.Type.RUNNING).getState());
        return originalTaskRun.forChildExecution(mappingTaskRunId, newExecutionId, toRestart != false ? alterState : null);
    }

    private Set<String> taskRunWithAncestors(Execution execution, List<TaskRun> taskRuns) {
        return taskRuns.stream().flatMap(Rethrow.throwFunction(taskRun -> this.getAncestors(execution, (TaskRun)taskRun).stream())).collect(Collectors.toSet());
    }

    public Instant nextRetryDate(AbstractRetry retry, Execution execution) {
        if (retry.getMaxAttempts() != null && execution.getMetadata().getAttemptNumber() >= retry.getMaxAttempts()) {
            return null;
        }
        Instant base = execution.getState().maxDate();
        Instant originalCreatedDate = execution.getMetadata().getOriginalCreatedDate();
        Instant nextDate = retry.nextRetryDate(execution.getMetadata().getAttemptNumber(), base);
        if (retry.getMaxDuration() != null && nextDate.isAfter(originalCreatedDate.plus(retry.getMaxDuration()))) {
            return null;
        }
        return nextDate;
    }

    public Execution forceRun(Execution execution) throws Exception {
        if (execution.getState().isTerminated()) {
            throw new IllegalArgumentException("Only non terminated executions can be forced run.");
        }
        if (execution.getState().isCreated()) {
            return execution.withState(State.Type.RUNNING);
        }
        if (execution.getState().getCurrent() == State.Type.QUEUED) {
            return this.concurrencyLimitService.unqueue(execution, State.Type.RUNNING);
        }
        if (execution.getState().getCurrent() == State.Type.PAUSED) {
            Flow flow = this.flowRepositoryInterface.findByExecution(execution);
            return this.resume(execution, flow, State.Type.RUNNING, null);
        }
        return execution;
    }

    public boolean isTerminated(Flow flow, Execution execution) {
        if (!execution.getState().isTerminated()) {
            return false;
        }
        List<ResolvedTask> validListeners = this.conditionService.findValidListeners(flow, execution);
        List<ResolvedTask> afterExecution = this.resolveAfterExecutionTasks(flow);
        return execution.isTerminated(validListeners) && execution.isTerminated(afterExecution);
    }

    public List<ResolvedTask> resolveAfterExecutionTasks(Flow flow) {
        if (flow == null || flow.getAfterExecution() == null) {
            return Collections.emptyList();
        }
        return flow.getAfterExecution().stream().map(ResolvedTask::of).toList();
    }

    public Trigger resetExecution(FlowWithSource flow, Execution execution, Trigger trigger) {
        if (!execution.getState().isTerminated()) {
            throw new IllegalArgumentException("Only terminated executions can be reset.");
        }
        FlowWithSource flowWithDefaults = this.pluginDefaultService.injectDefaults((FlowInterface)flow, execution);
        RunContext runContext = this.runContextFactory.of((Flow)flowWithDefaults, flowWithDefaults.findTriggerByTriggerId(trigger.getTriggerId()));
        ConditionContext conditionContext = this.conditionService.conditionContext(runContext, flowWithDefaults, null);
        return trigger.resetExecution(flowWithDefaults, execution, conditionContext);
    }

    public static class PurgeResult {
        private int executionsCount;
        private int logsCount;
        private int storagesCount;
        private int metricsCount;

        @Generated
        private static int $default$executionsCount() {
            return 0;
        }

        @Generated
        private static int $default$logsCount() {
            return 0;
        }

        @Generated
        private static int $default$storagesCount() {
            return 0;
        }

        @Generated
        private static int $default$metricsCount() {
            return 0;
        }

        @Generated
        protected PurgeResult(PurgeResultBuilder<?, ?> b) {
            this.executionsCount = b.executionsCount$set ? b.executionsCount$value : PurgeResult.$default$executionsCount();
            this.logsCount = b.logsCount$set ? b.logsCount$value : PurgeResult.$default$logsCount();
            this.storagesCount = b.storagesCount$set ? b.storagesCount$value : PurgeResult.$default$storagesCount();
            this.metricsCount = b.metricsCount$set ? b.metricsCount$value : PurgeResult.$default$metricsCount();
        }

        @Generated
        public static PurgeResultBuilder<?, ?> builder() {
            return new PurgeResultBuilderImpl();
        }

        @Generated
        public PurgeResultBuilder<?, ?> toBuilder() {
            return new PurgeResultBuilderImpl().$fillValuesFrom(this);
        }

        @Generated
        public int getExecutionsCount() {
            return this.executionsCount;
        }

        @Generated
        public int getLogsCount() {
            return this.logsCount;
        }

        @Generated
        public int getStoragesCount() {
            return this.storagesCount;
        }

        @Generated
        public int getMetricsCount() {
            return this.metricsCount;
        }

        @Generated
        public static abstract class PurgeResultBuilder<C extends PurgeResult, B extends PurgeResultBuilder<C, B>> {
            @Generated
            private boolean executionsCount$set;
            @Generated
            private int executionsCount$value;
            @Generated
            private boolean logsCount$set;
            @Generated
            private int logsCount$value;
            @Generated
            private boolean storagesCount$set;
            @Generated
            private int storagesCount$value;
            @Generated
            private boolean metricsCount$set;
            @Generated
            private int metricsCount$value;

            @Generated
            protected B $fillValuesFrom(C instance) {
                PurgeResultBuilder.$fillValuesFromInstanceIntoBuilder(instance, this);
                return this.self();
            }

            @Generated
            private static void $fillValuesFromInstanceIntoBuilder(PurgeResult instance, PurgeResultBuilder<?, ?> b) {
                b.executionsCount(instance.executionsCount);
                b.logsCount(instance.logsCount);
                b.storagesCount(instance.storagesCount);
                b.metricsCount(instance.metricsCount);
            }

            @Generated
            public B executionsCount(int executionsCount) {
                this.executionsCount$value = executionsCount;
                this.executionsCount$set = true;
                return this.self();
            }

            @Generated
            public B logsCount(int logsCount) {
                this.logsCount$value = logsCount;
                this.logsCount$set = true;
                return this.self();
            }

            @Generated
            public B storagesCount(int storagesCount) {
                this.storagesCount$value = storagesCount;
                this.storagesCount$set = true;
                return this.self();
            }

            @Generated
            public B metricsCount(int metricsCount) {
                this.metricsCount$value = metricsCount;
                this.metricsCount$set = true;
                return this.self();
            }

            @Generated
            protected abstract B self();

            @Generated
            public abstract C build();

            @Generated
            public String toString() {
                return "ExecutionService.PurgeResult.PurgeResultBuilder(executionsCount$value=" + this.executionsCount$value + ", logsCount$value=" + this.logsCount$value + ", storagesCount$value=" + this.storagesCount$value + ", metricsCount$value=" + this.metricsCount$value + ")";
            }
        }

        @Generated
        private static final class PurgeResultBuilderImpl
        extends PurgeResultBuilder<PurgeResult, PurgeResultBuilderImpl> {
            @Generated
            private PurgeResultBuilderImpl() {
            }

            @Override
            @Generated
            protected PurgeResultBuilderImpl self() {
                return this;
            }

            @Override
            @Generated
            public PurgeResult build() {
                return new PurgeResult(this);
            }
        }
    }
}

