/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.services;

import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.AbstractGraphTask;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.services.GraphService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.flows.Worker;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.Rethrow;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.net.URI;
import java.time.ZonedDateTime;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;

@Singleton
public class ExecutionService {
    @Inject
    private ApplicationContext applicationContext;
    @Inject
    private FlowRepositoryInterface flowRepositoryInterface;
    @Inject
    private StorageInterface storageInterface;
    @Inject
    private ExecutionRepositoryInterface executionRepository;
    @Inject
    private LogRepositoryInterface logRepository;
    @Inject
    private MetricRepositoryInterface metricRepository;

    public Execution restart(Execution execution, @Nullable Integer revision) throws Exception {
        if (!execution.getState().isTerminated() && !execution.getState().isPaused()) {
            throw new IllegalStateException("Execution must be terminated to be restarted, current state is '" + execution.getState().getCurrent() + "' !");
        }
        Flow flow = this.flowRepositoryInterface.findByExecution(execution);
        Set<String> taskRunToRestart = this.taskRunToRestart(execution, taskRun -> taskRun.getState().getCurrent().isFailed() || taskRun.getState().getCurrent().isPaused());
        Map<String, String> mappingTaskRunId = this.mapTaskRunId(execution, revision == null);
        String newExecutionId = revision != null ? IdUtils.create() : null;
        List<TaskRun> newTaskRuns = execution.getTaskRunList().stream().map(Rethrow.throwFunction(originalTaskRun -> this.mapTaskRun(flow, (TaskRun)originalTaskRun, mappingTaskRunId, newExecutionId, State.Type.RESTARTED, taskRunToRestart.contains(originalTaskRun.getId())))).collect(Collectors.toList());
        this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId).forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));
        flow.allErrorsWithChilds().forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
        Execution newExecution = execution.childExecution(newExecutionId, newTaskRuns, execution.withState(State.Type.RESTARTED).getState());
        return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
    }

    private Set<String> taskRunToRestart(Execution execution, Predicate<TaskRun> predicate) {
        Set<String> finalTaskRunToRestart = this.taskRunWithAncestors(execution, execution.getTaskRunList().stream().filter(predicate).collect(Collectors.toList()));
        if (finalTaskRunToRestart.size() == 0) {
            throw new IllegalArgumentException("No task found to restart execution from!");
        }
        return finalTaskRunToRestart;
    }

    public Execution replay(Execution execution, String taskRunId, @Nullable Integer revision) throws Exception {
        if (!execution.getState().isTerminated() && execution.getState().isTerminated()) {
            throw new IllegalStateException("Execution must be terminated to be restarted, current state is '" + execution.getState().getCurrent() + "' !");
        }
        Flow flow = this.flowRepositoryInterface.findByExecution(execution);
        GraphCluster graphCluster = GraphService.of(flow, execution);
        Set<String> taskRunToRestart = this.taskRunToRestart(execution, taskRun -> taskRun.getId().equals(taskRunId));
        Map<String, String> mappingTaskRunId = this.mapTaskRunId(execution, false);
        String newExecutionId = IdUtils.create();
        List<TaskRun> newTaskRuns = execution.getTaskRunList().stream().map(Rethrow.throwFunction(originalTaskRun -> this.mapTaskRun(flow, (TaskRun)originalTaskRun, mappingTaskRunId, newExecutionId, State.Type.RESTARTED, taskRunToRestart.contains(originalTaskRun.getId())))).collect(Collectors.toList());
        Set<String> taskRunToRemove = GraphService.successors(graphCluster, List.of(taskRunId)).stream().filter(task -> task instanceof AbstractGraphTask).map(task -> (AbstractGraphTask)task).filter(task -> task.getTaskRun() != null).filter(task -> !task.getTaskRun().getId().equals(taskRunId)).filter(task -> !taskRunToRestart.contains(task.getTaskRun().getId())).map(s -> (String)mappingTaskRunId.get(s.getTaskRun().getId())).collect(Collectors.toSet());
        taskRunToRemove.forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));
        this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId).forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));
        Execution newExecution = execution.childExecution(newExecutionId, newTaskRuns, execution.withState(State.Type.RESTARTED).getState());
        return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
    }

    public Execution markAs(Execution execution, String taskRunId, State.Type newState) throws Exception {
        if (!execution.getState().isTerminated() && !execution.getState().isPaused()) {
            throw new IllegalStateException("Execution must be terminated to be restarted, current state is '" + execution.getState().getCurrent() + "' !");
        }
        Flow flow = this.flowRepositoryInterface.findByExecution(execution);
        Set<String> taskRunToRestart = this.taskRunToRestart(execution, taskRun -> taskRun.getId().equals(taskRunId));
        Execution newExecution = execution;
        for (String s : taskRunToRestart) {
            TaskRun originalTaskRun = newExecution.findTaskRunByTaskRunId(s);
            boolean isFlowable = flow.findTaskByTaskId(originalTaskRun.getTaskId()).isFlowable();
            if (!isFlowable || s.equals(taskRunId)) {
                TaskRun newTaskRun = originalTaskRun.withState(newState);
                if (originalTaskRun.getAttempts() != null && originalTaskRun.getAttempts().size() > 0) {
                    ArrayList<TaskRunAttempt> attempts = new ArrayList<TaskRunAttempt>(originalTaskRun.getAttempts());
                    attempts.set(attempts.size() - 1, attempts.get(attempts.size() - 1).withState(newState));
                    newTaskRun = newTaskRun.withAttempts(attempts);
                }
                newExecution = newExecution.withTaskRun(newTaskRun);
                continue;
            }
            newExecution = newExecution.withTaskRun(originalTaskRun.withState(State.Type.RUNNING));
        }
        return newExecution.withState(State.Type.RESTARTED);
    }

    public PurgeResult purge(Boolean purgeExecution, Boolean purgeLog, Boolean purgeMetric, Boolean purgeStorage, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime endDate, @Nullable List<State.Type> state) throws IOException {
        PurgeResult purgeResult = (PurgeResult)this.executionRepository.find(null, namespace, flowId, null, endDate, state).map(execution -> {
            PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
            if (purgeExecution.booleanValue()) {
                builder.executionsCount(this.executionRepository.purge((Execution)execution));
            }
            if (purgeLog.booleanValue()) {
                builder.logsCount(this.logRepository.purge((Execution)execution));
            }
            if (purgeMetric.booleanValue()) {
                this.metricRepository.purge((Execution)execution);
            }
            if (purgeStorage.booleanValue()) {
                builder.storagesCount(this.storageInterface.deleteByPrefix(URI.create("/" + this.storageInterface.executionPrefix((Execution)execution))).size());
            }
            return builder.build();
        }).reduce((a, b) -> ((PurgeResult.PurgeResultBuilder)((PurgeResult.PurgeResultBuilder)((PurgeResult.PurgeResultBuilder)a.toBuilder().executionsCount(a.getExecutionsCount() + b.getExecutionsCount())).logsCount(a.getLogsCount() + b.getLogsCount())).storagesCount(a.getStoragesCount() + b.getStoragesCount())).build()).blockingGet();
        if (purgeResult != null) {
            return purgeResult;
        }
        return PurgeResult.builder().build();
    }

    private Set<String> removeWorkerTask(Flow flow, Execution execution, Set<String> taskRunToRestart, Map<String, String> mappingTaskRunId) throws InternalException {
        Set workerTaskRunId = taskRunToRestart.stream().filter(Rethrow.throwPredicate(s -> {
            TaskRun taskRun = execution.findTaskRunByTaskRunId((String)s);
            Task task = flow.findTaskByTaskId(taskRun.getTaskId());
            return task instanceof Worker;
        })).collect(Collectors.toSet());
        GraphCluster graphCluster = GraphService.of(flow, execution);
        return GraphService.successors(graphCluster, new ArrayList<String>(workerTaskRunId)).stream().filter(task -> task instanceof AbstractGraphTask).map(task -> (AbstractGraphTask)task).filter(task -> task.getTaskRun() != null).filter(s -> !workerTaskRunId.contains(s.getTaskRun().getId())).map(s -> (String)mappingTaskRunId.get(s.getTaskRun().getId())).collect(Collectors.toSet());
    }

    private Set<String> getAncestors(Execution execution, TaskRun taskRun) {
        return Stream.concat(execution.findChilds(taskRun).stream(), Stream.of(taskRun)).map(TaskRun::getId).collect(Collectors.toSet());
    }

    private Map<String, String> mapTaskRunId(Execution execution, boolean keep) {
        return execution.getTaskRunList().stream().map(t -> new AbstractMap.SimpleEntry<String, String>(t.getId(), keep ? t.getId() : IdUtils.create())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private TaskRun mapTaskRun(Flow flow, TaskRun originalTaskRun, Map<String, String> mappingTaskRunId, String newExecutionId, State.Type newStateType, Boolean toRestart) throws InternalException {
        Task task = flow.findTaskByTaskId(originalTaskRun.getTaskId());
        State alterState = !task.isFlowable() || task instanceof Worker ? originalTaskRun.withState(newStateType).getState() : originalTaskRun.withState(State.Type.RUNNING).getState();
        return originalTaskRun.forChildExecution(mappingTaskRunId, newExecutionId, toRestart != false ? alterState : null);
    }

    private Set<String> taskRunWithAncestors(Execution execution, List<TaskRun> taskRuns) {
        return taskRuns.stream().flatMap(Rethrow.throwFunction(taskRun -> this.getAncestors(execution, (TaskRun)taskRun).stream())).collect(Collectors.toSet());
    }

    public static class PurgeResult {
        private int executionsCount;
        private int logsCount;
        private int storagesCount;

        @Generated
        private static int $default$executionsCount() {
            return 0;
        }

        @Generated
        private static int $default$logsCount() {
            return 0;
        }

        @Generated
        private static int $default$storagesCount() {
            return 0;
        }

        @Generated
        protected PurgeResult(PurgeResultBuilder<?, ?> b) {
            this.executionsCount = b.executionsCount$set ? b.executionsCount$value : PurgeResult.$default$executionsCount();
            this.logsCount = b.logsCount$set ? b.logsCount$value : PurgeResult.$default$logsCount();
            this.storagesCount = b.storagesCount$set ? b.storagesCount$value : PurgeResult.$default$storagesCount();
        }

        @Generated
        public static PurgeResultBuilder<?, ?> builder() {
            return new PurgeResultBuilderImpl();
        }

        @Generated
        public PurgeResultBuilder<?, ?> toBuilder() {
            return new PurgeResultBuilderImpl().$fillValuesFrom(this);
        }

        @Generated
        public int getExecutionsCount() {
            return this.executionsCount;
        }

        @Generated
        public int getLogsCount() {
            return this.logsCount;
        }

        @Generated
        public int getStoragesCount() {
            return this.storagesCount;
        }

        @Generated
        public static abstract class PurgeResultBuilder<C extends PurgeResult, B extends PurgeResultBuilder<C, B>> {
            @Generated
            private boolean executionsCount$set;
            @Generated
            private int executionsCount$value;
            @Generated
            private boolean logsCount$set;
            @Generated
            private int logsCount$value;
            @Generated
            private boolean storagesCount$set;
            @Generated
            private int storagesCount$value;

            @Generated
            protected B $fillValuesFrom(C instance) {
                PurgeResultBuilder.$fillValuesFromInstanceIntoBuilder(instance, this);
                return this.self();
            }

            @Generated
            private static void $fillValuesFromInstanceIntoBuilder(PurgeResult instance, PurgeResultBuilder<?, ?> b) {
                b.executionsCount(instance.executionsCount);
                b.logsCount(instance.logsCount);
                b.storagesCount(instance.storagesCount);
            }

            @Generated
            public B executionsCount(int executionsCount) {
                this.executionsCount$value = executionsCount;
                this.executionsCount$set = true;
                return this.self();
            }

            @Generated
            public B logsCount(int logsCount) {
                this.logsCount$value = logsCount;
                this.logsCount$set = true;
                return this.self();
            }

            @Generated
            public B storagesCount(int storagesCount) {
                this.storagesCount$value = storagesCount;
                this.storagesCount$set = true;
                return this.self();
            }

            @Generated
            protected abstract B self();

            @Generated
            public abstract C build();

            @Generated
            public String toString() {
                return "ExecutionService.PurgeResult.PurgeResultBuilder(executionsCount$value=" + this.executionsCount$value + ", logsCount$value=" + this.logsCount$value + ", storagesCount$value=" + this.storagesCount$value + ")";
            }
        }

        @Generated
        private static final class PurgeResultBuilderImpl
        extends PurgeResultBuilder<PurgeResult, PurgeResultBuilderImpl> {
            @Generated
            private PurgeResultBuilderImpl() {
            }

            @Override
            @Generated
            protected PurgeResultBuilderImpl self() {
                return this;
            }

            @Override
            @Generated
            public PurgeResult build() {
                return new PurgeResult(this);
            }
        }
    }
}

