/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.runners;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.api.Assertions;

@Singleton
public class FlowConcurrencyCaseTest {
    public static final String NAMESPACE = "io.kestra.tests";
    @Inject
    private StorageInterface storageInterface;
    @Inject
    protected TestRunnerUtils runnerUtils;
    @Inject
    private FlowInputOutput flowIO;
    @Inject
    private FlowRepositoryInterface flowRepository;
    @Inject
    private ExecutionService executionService;
    @Inject
    @Named(value="executionKilledQueue")
    protected QueueInterface<ExecutionKilled> killQueue;

    public void flowConcurrencyCancel() throws TimeoutException, QueueException {
        Execution execution1 = this.runnerUtils.runOneUntilRunning("main", NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30L));
        try {
            List<Execution> shouldFailExecutions = List.of(this.runnerUtils.runOne("main", NAMESPACE, "flow-concurrency-cancel"), this.runnerUtils.runOne("main", NAMESPACE, "flow-concurrency-cancel"));
            Assertions.assertThat((boolean)execution1.getState().isRunning()).isTrue();
            Assertions.assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(arg_0 -> State.Type.CANCELLED.equals(arg_0));
        }
        finally {
            this.runnerUtils.killExecution(execution1);
        }
    }

    public void flowConcurrencyFail() throws TimeoutException, QueueException {
        Execution execution1 = this.runnerUtils.runOneUntilRunning("main", NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30L));
        try {
            List<Execution> shouldFailExecutions = List.of(this.runnerUtils.runOne("main", NAMESPACE, "flow-concurrency-fail"), this.runnerUtils.runOne("main", NAMESPACE, "flow-concurrency-fail"));
            Assertions.assertThat((boolean)execution1.getState().isRunning()).isTrue();
            Assertions.assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(arg_0 -> State.Type.FAILED.equals(arg_0));
        }
        finally {
            this.runnerUtils.killExecution(execution1);
        }
    }

    public void flowConcurrencyQueue() throws QueueException {
        Execution execution1 = this.runnerUtils.runOneUntilRunning("main", NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30L));
        Flow flow = (Flow)this.flowRepository.findById("main", NAMESPACE, "flow-concurrency-queue", Optional.empty()).orElseThrow();
        Execution execution2 = Execution.newExecution((FlowInterface)flow, null, null, Optional.empty());
        Execution executionResult2 = this.runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.SUCCESS), execution2);
        Execution executionResult1 = this.runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.SUCCESS), execution1);
        Assertions.assertThat((boolean)execution1.getState().isRunning()).isTrue();
        Assertions.assertThat((Comparable)execution2.getState().getCurrent()).isEqualTo((Object)State.Type.CREATED);
        Assertions.assertThat((Comparable)executionResult1.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((Comparable)executionResult2.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((Comparable)((State.History)executionResult2.getState().getHistories().getFirst()).getState()).isEqualTo((Object)State.Type.CREATED);
        Assertions.assertThat((Comparable)((State.History)executionResult2.getState().getHistories().get(1)).getState()).isEqualTo((Object)State.Type.QUEUED);
        Assertions.assertThat((Comparable)((State.History)executionResult2.getState().getHistories().get(2)).getState()).isEqualTo((Object)State.Type.RUNNING);
    }

    public void flowConcurrencyQueuePause() throws QueueException {
        Execution execution1 = this.runnerUtils.runOneUntilPaused("main", NAMESPACE, "flow-concurrency-queue-pause");
        Flow flow = (Flow)this.flowRepository.findById("main", NAMESPACE, "flow-concurrency-queue-pause", Optional.empty()).orElseThrow();
        Execution execution2 = Execution.newExecution((FlowInterface)flow, null, null, Optional.empty());
        Execution secondExecutionResult = this.runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.SUCCESS), execution2);
        Execution firstExecutionResult = this.runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.SUCCESS), execution1);
        Assertions.assertThat((String)firstExecutionResult.getId()).isEqualTo(execution1.getId());
        Assertions.assertThat((Comparable)firstExecutionResult.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((String)secondExecutionResult.getId()).isEqualTo(execution2.getId());
        Assertions.assertThat((Comparable)secondExecutionResult.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((Comparable)((State.History)secondExecutionResult.getState().getHistories().getFirst()).getState()).isEqualTo((Object)State.Type.CREATED);
        Assertions.assertThat((Comparable)((State.History)secondExecutionResult.getState().getHistories().get(1)).getState()).isEqualTo((Object)State.Type.QUEUED);
        Assertions.assertThat((Comparable)((State.History)secondExecutionResult.getState().getHistories().get(2)).getState()).isEqualTo((Object)State.Type.RUNNING);
    }

    public void flowConcurrencyCancelPause() throws QueueException {
        Execution execution1 = this.runnerUtils.runOneUntilPaused("main", NAMESPACE, "flow-concurrency-cancel-pause");
        Flow flow = (Flow)this.flowRepository.findById("main", NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty()).orElseThrow();
        Execution execution2 = Execution.newExecution((FlowInterface)flow, null, null, Optional.empty());
        Execution secondExecutionResult = this.runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.CANCELLED), execution2);
        Execution firstExecutionResult = this.runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.SUCCESS), execution1);
        Assertions.assertThat((String)firstExecutionResult.getId()).isEqualTo(execution1.getId());
        Assertions.assertThat((Comparable)firstExecutionResult.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((String)secondExecutionResult.getId()).isEqualTo(execution2.getId());
        Assertions.assertThat((Comparable)secondExecutionResult.getState().getCurrent()).isEqualTo((Object)State.Type.CANCELLED);
        Assertions.assertThat((Comparable)((State.History)secondExecutionResult.getState().getHistories().getFirst()).getState()).isEqualTo((Object)State.Type.CREATED);
        Assertions.assertThat((Comparable)((State.History)secondExecutionResult.getState().getHistories().get(1)).getState()).isEqualTo((Object)State.Type.CANCELLED);
    }

    public void flowConcurrencyWithForEachItem(String tenantId) throws QueueException, URISyntaxException, IOException {
        URI file = this.storageUpload(tenantId);
        Map<String, Integer> inputs = Map.of("file", file.toString(), "batch", 4);
        Execution forEachItem = this.runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-for-each-item", null, (flow, execution1) -> this.flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5L));
        Assertions.assertThat((Comparable)forEachItem.getState().getCurrent()).isEqualTo((Object)State.Type.RUNNING);
        Execution terminated = this.runnerUtils.awaitExecution(e -> e.getState().isTerminated(), forEachItem);
        Assertions.assertThat((Comparable)terminated.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        List executions = this.runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-queue");
        Assertions.assertThat((List)executions).extracting(e -> e.getState().getCurrent()).containsOnly((Object[])new State.Type[]{State.Type.SUCCESS});
        Assertions.assertThat(executions.stream().map(e -> e.getState().getHistories()).flatMap(Collection::stream).map(State.History::getState).toList()).contains((Object[])new State.Type[]{State.Type.QUEUED});
    }

    public void flowConcurrencyQueueRestarted() throws Exception {
        Execution execution1 = this.runnerUtils.runOneUntilRunning("main", NAMESPACE, "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30L));
        Flow flow = (Flow)this.flowRepository.findById("main", NAMESPACE, "flow-concurrency-queue-fail", Optional.empty()).orElseThrow();
        Execution execution2 = Execution.newExecution((FlowInterface)flow, null, null, Optional.empty());
        this.runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.RUNNING), execution2);
        Execution failedExecution = this.runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.FAILED), execution1);
        Execution restarted = this.executionService.restart(failedExecution, null);
        Execution executionResult1 = this.runnerUtils.restartExecution(e -> e.getState().getCurrent().equals((Object)State.Type.FAILED), restarted);
        Execution executionResult2 = this.runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.FAILED), execution2);
        Assertions.assertThat((Comparable)executionResult1.getState().getCurrent()).isEqualTo((Object)State.Type.FAILED);
        Assertions.assertThat((boolean)executionResult1.getState().getHistories().stream().anyMatch(history -> history.getState() == State.Type.RESTARTED)).isTrue();
        Assertions.assertThat((boolean)executionResult1.getState().getHistories().stream().anyMatch(history -> history.getState() == State.Type.QUEUED)).isTrue();
        Assertions.assertThat((Comparable)executionResult2.getState().getCurrent()).isEqualTo((Object)State.Type.FAILED);
        Assertions.assertThat((Comparable)((State.History)executionResult2.getState().getHistories().getFirst()).getState()).isEqualTo((Object)State.Type.CREATED);
        Assertions.assertThat((Comparable)((State.History)executionResult2.getState().getHistories().get(1)).getState()).isEqualTo((Object)State.Type.QUEUED);
        Assertions.assertThat((Comparable)((State.History)executionResult2.getState().getHistories().get(2)).getState()).isEqualTo((Object)State.Type.RUNNING);
    }

    public void flowConcurrencyQueueAfterExecution() throws QueueException {
        Execution execution1 = this.runnerUtils.runOneUntilRunning("main", NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30L));
        Flow flow = (Flow)this.flowRepository.findById("main", NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty()).orElseThrow();
        Execution execution2 = Execution.newExecution((FlowInterface)flow, null, null, Optional.empty());
        Execution executionResult2 = this.runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.SUCCESS), execution2);
        Execution executionResult1 = this.runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals((Object)State.Type.SUCCESS), execution1);
        Assertions.assertThat((Comparable)executionResult1.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((Comparable)executionResult2.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((Comparable)((State.History)executionResult2.getState().getHistories().getFirst()).getState()).isEqualTo((Object)State.Type.CREATED);
        Assertions.assertThat((Comparable)((State.History)executionResult2.getState().getHistories().get(1)).getState()).isEqualTo((Object)State.Type.QUEUED);
        Assertions.assertThat((Comparable)((State.History)executionResult2.getState().getHistories().get(2)).getState()).isEqualTo((Object)State.Type.RUNNING);
    }

    public void flowConcurrencySubflow(String tenantId) throws TimeoutException, QueueException {
        this.runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-subflow", null, null, Duration.ofSeconds(30L));
        this.runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
        List subFlowExecs = this.runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel");
        Assertions.assertThat((List)subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder((Object[])new State.Type[]{State.Type.SUCCESS, State.Type.CANCELLED});
        Execution execution3 = this.runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
        Assertions.assertThat((Comparable)execution3.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        this.runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals((Object)State.Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");
    }

    public void flowConcurrencyParallelSubflowKill() throws QueueException {
        Execution parent = this.runnerUtils.runOneUntilRunning("main", NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30L));
        Execution queued = this.runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), "main", NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
        this.killQueue.emit((Object)((ExecutionKilledExecution.ExecutionKilledExecutionBuilder)((ExecutionKilledExecution.ExecutionKilledExecutionBuilder)ExecutionKilledExecution.builder().state(ExecutionKilled.State.REQUESTED)).executionId(parent.getId()).isOnKillCascade(Boolean.valueOf(true)).tenantId("main")).build());
        Execution terminated = this.runnerUtils.awaitExecution(e -> e.getState().isTerminated(), queued);
        Assertions.assertThat((Comparable)terminated.getState().getCurrent()).isEqualTo((Object)State.Type.KILLED);
        Assertions.assertThat((boolean)terminated.getState().getHistories().stream().noneMatch(h -> h.getState() == State.Type.RUNNING)).isTrue();
        Assertions.assertThat((List)terminated.getTaskRunList()).isNull();
    }

    private URI storageUpload(String tenantId) throws URISyntaxException, IOException {
        File tempFile = File.createTempFile("file", ".txt");
        Files.write(tempFile.toPath(), this.content(), new OpenOption[0]);
        return this.storageInterface.put(tenantId, null, new URI("/file/storage/file.txt"), (InputStream)new FileInputStream(tempFile));
    }

    private List<String> content() {
        return IntStream.range(0, 7).mapToObj(value -> StringUtils.leftPad((String)("" + value), (int)20)).toList();
    }
}

