/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.plugin.core.flow;

import com.google.common.io.CharStreams;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.core.flow.Pause;
import io.micronaut.http.multipart.CompletedPart;
import io.micronaut.http.server.HttpServerConfiguration;
import io.micronaut.http.server.netty.MicronautHttpData;
import io.micronaut.http.server.netty.multipart.NettyCompletedAttribute;
import io.micronaut.http.server.netty.multipart.NettyCompletedFileUpload;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.MemoryAttribute;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@KestraTest(startRunner=true)
public class PauseTest {
    @Inject
    TestRunnerUtils runnerUtils;
    @Inject
    Suite suite;

    @Test
    @LoadFlows(value={"flows/valids/pause-test.yaml"})
    void run() throws Exception {
        this.suite.run(this.runnerUtils);
    }

    @FlakyTest(description="This test is too flaky and it always pass in JDBC and Kafka")
    @Test
    @LoadFlows(value={"flows/valids/pause-delay.yaml"})
    void delay() throws Exception {
        this.suite.runDelay(this.runnerUtils);
    }

    @FlakyTest(description="This test is too flaky and it always pass in JDBC and Kafka")
    @Test
    @LoadFlows(value={"flows/valids/pause-duration-from-input.yaml"})
    void delayFromInput() throws Exception {
        this.suite.runDurationFromInput(this.runnerUtils);
    }

    @FlakyTest(description="This test is too flaky and it always pass in JDBC and Kafka")
    @Test
    @LoadFlows(value={"flows/valids/each-parallel-pause.yml"})
    void parallelDelay() throws Exception {
        this.suite.runParallelDelay(this.runnerUtils);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause-timeout.yaml"})
    void timeout() throws Exception {
        this.suite.runTimeout(this.runnerUtils);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause-timeout-allow-failure.yaml"})
    void timeoutAllowFailure() throws Exception {
        this.suite.runTimeoutAllowFailure(this.runnerUtils);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause_no_tasks.yaml"})
    void runEmptyTasks() throws Exception {
        this.suite.runEmptyTasks(this.runnerUtils);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause_on_resume.yaml"})
    void runOnResume() throws Exception {
        this.suite.runOnResume(this.runnerUtils);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause_on_resume.yaml"}, tenantId="tenant1")
    void runOnResumeMissingInputs() throws Exception {
        this.suite.runOnResumeMissingInputs("tenant1", this.runnerUtils);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause_on_resume_optional.yaml"})
    void runOnResumeOptionalInputs() throws Exception {
        this.suite.runOnResumeOptionalInputs(this.runnerUtils);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause-behavior.yaml"}, tenantId="resume")
    void runDurationWithCONTINUEBehavior() throws Exception {
        this.suite.runDurationWithBehavior("resume", this.runnerUtils, Pause.Behavior.RESUME);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause-behavior.yaml"}, tenantId="fail")
    void runDurationWithFAILBehavior() throws Exception {
        this.suite.runDurationWithBehavior("fail", this.runnerUtils, Pause.Behavior.FAIL);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause-behavior.yaml"}, tenantId="warn")
    void runDurationWithWARNBehavior() throws Exception {
        this.suite.runDurationWithBehavior("warn", this.runnerUtils, Pause.Behavior.WARN);
    }

    @Test
    @LoadFlows(value={"flows/valids/pause-behavior.yaml"}, tenantId="cancel")
    void runDurationWithCANCELBehavior() throws Exception {
        this.suite.runDurationWithBehavior("cancel", this.runnerUtils, Pause.Behavior.CANCEL);
    }

    @Test
    @ExecuteFlow(value="flows/valids/pause_on_pause.yaml")
    void shouldExecuteOnPauseTask(Execution execution) throws Exception {
        this.suite.shouldExecuteOnPauseTask(execution);
    }

    @Test
    @ExecuteFlow(value="flows/valids/pause-errors-finally-after-execution.yaml")
    void shouldExecuteErrorsFinallyAndAfterExecution(Execution execution) throws Exception {
        this.suite.shouldExecuteErrorsFinallyAndAfterExecution(execution);
    }

    @Singleton
    public static class Suite {
        @Inject
        ExecutionService executionService;
        @Inject
        FlowRepositoryInterface flowRepository;
        @Inject
        StorageInterface storageInterface;

        public void run(TestRunnerUtils runnerUtils) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused("main", "io.kestra.tests", "pause-test", null, null, Duration.ofSeconds(30L));
            String executionId = execution.getId();
            Flow flow = this.flowRepository.findByExecution(execution);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((Comparable)((TaskRun)execution.getTaskRunList().getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(1);
            Execution restarted = this.executionService.markAs(execution, (FlowInterface)flow, execution.findTaskRunByTaskIdAndValue("pause", List.of()).getId(), State.Type.RUNNING);
            execution = runnerUtils.emitAndAwaitExecution(e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS, restarted);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        }

        public void runDelay(TestRunnerUtils runnerUtils) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused("main", "io.kestra.tests", "pause-delay", null, null, Duration.ofSeconds(30L));
            String executionId = execution.getId();
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(1);
            execution = runnerUtils.awaitExecution(e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS, execution);
            Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count()).isEqualTo(1L);
            Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count()).isEqualTo(2L);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(3);
        }

        public void runDurationFromInput(TestRunnerUtils runnerUtils) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused("main", "io.kestra.tests", "pause-duration-from-input", null, null, Duration.ofSeconds(30L));
            String executionId = execution.getId();
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(1);
            execution = runnerUtils.awaitExecution(e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS, execution);
            Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count()).isEqualTo(1L);
            Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count()).isEqualTo(2L);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(3);
        }

        public void runParallelDelay(TestRunnerUtils runnerUtils) throws TimeoutException, QueueException {
            Execution execution = runnerUtils.runOne("main", "io.kestra.tests", "each-parallel-pause", Duration.ofSeconds(30L));
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(7);
        }

        public void runTimeout(TestRunnerUtils runnerUtils) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused("main", "io.kestra.tests", "pause-timeout", null, null, Duration.ofSeconds(30L));
            String executionId = execution.getId();
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(1);
            execution = runnerUtils.awaitExecution(e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.FAILED, execution);
            ((AbstractLongAssert)Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count()).as("Task runs were: " + execution.getTaskRunList().toString(), new Object[0])).isEqualTo(1L);
            Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count()).isEqualTo(2L);
            Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.FAILED).count()).isEqualTo(1L);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(1);
        }

        public void runTimeoutAllowFailure(TestRunnerUtils runnerUtils) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused("main", "io.kestra.tests", "pause-timeout-allow-failure", null, null, Duration.ofSeconds(30L));
            String executionId = execution.getId();
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(1);
            execution = runnerUtils.awaitExecution(e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.WARNING, execution);
            ((AbstractLongAssert)Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count()).as("Task runs were: " + execution.getTaskRunList().toString(), new Object[0])).isEqualTo(1L);
            Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count()).isEqualTo(2L);
            Assertions.assertThat((long)((TaskRun)execution.getTaskRunList().getFirst()).getState().getHistories().stream().filter(history -> history.getState() == State.Type.WARNING).count()).isEqualTo(1L);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(3);
        }

        public void runEmptyTasks(TestRunnerUtils runnerUtils) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused("main", "io.kestra.tests", "pause_no_tasks", null, null, Duration.ofSeconds(30L));
            String executionId = execution.getId();
            Flow flow = this.flowRepository.findByExecution(execution);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((Comparable)((TaskRun)execution.getTaskRunList().getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(1);
            Execution restarted = this.executionService.markAs(execution, (FlowInterface)flow, execution.findTaskRunByTaskIdAndValue("pause", List.of()).getId(), State.Type.RUNNING);
            execution = runnerUtils.emitAndAwaitExecution(e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS, restarted);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        }

        public void runOnResume(TestRunnerUtils runnerUtils) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused("main", "io.kestra.tests", "pause_on_resume", null, null, Duration.ofSeconds(30L));
            String executionId = execution.getId();
            Flow flow = this.flowRepository.findByExecution(execution);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((Comparable)((TaskRun)execution.getTaskRunList().getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(1);
            NettyCompletedAttribute part1 = new NettyCompletedAttribute((Attribute)new MemoryAttribute("asked", "restarted"));
            byte[] data = executionId.getBytes();
            MicronautHttpData.Factory httpDataFactory = new MicronautHttpData.Factory(new HttpServerConfiguration.MultipartConfiguration(), null);
            FileUpload fileUpload = httpDataFactory.createFileUpload(null, "files", "data", "text/plain", null, Charset.defaultCharset(), (long)data.length);
            fileUpload.addContent(Unpooled.copiedBuffer((byte[])data), true);
            NettyCompletedFileUpload part2 = new NettyCompletedFileUpload(fileUpload);
            Execution restarted = (Execution)this.executionService.resume(execution, (FlowInterface)flow, State.Type.RUNNING, (Publisher)Flux.just((Object[])new CompletedPart[]{part1, part2}), null).block();
            execution = runnerUtils.emitAndAwaitExecution(e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS, restarted);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
            Map outputs = (Map)((TaskRun)execution.findTaskRunsByTaskId("last").getFirst()).getOutputs().get((Object)"values");
            Assertions.assertThat(outputs.get("asked")).isEqualTo((Object)"restarted");
            Assertions.assertThat((String)((String)outputs.get("data"))).startsWith((CharSequence)"kestra://");
            Assertions.assertThat((String)CharStreams.toString((Readable)new InputStreamReader(this.storageInterface.get("main", null, URI.create((String)outputs.get("data")))))).isEqualTo(executionId);
        }

        public void runOnResumeMissingInputs(String tenantId, TestRunnerUtils runnerUtils) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused(tenantId, "io.kestra.tests", "pause_on_resume", null, null, Duration.ofSeconds(30L));
            Flow flow = this.flowRepository.findByExecution(execution);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            ConstraintViolationException e = (ConstraintViolationException)org.junit.jupiter.api.Assertions.assertThrows(ConstraintViolationException.class, () -> this.executionService.resume(execution, (FlowInterface)flow, State.Type.RUNNING, (Publisher)Mono.empty(), Pause.Resumed.now()).block());
            Assertions.assertThat((String)e.getMessage()).contains(new CharSequence[]{"Invalid input for `asked`, missing required input, but received `null`"});
        }

        public void runOnResumeOptionalInputs(TestRunnerUtils runnerUtils) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused("main", "io.kestra.tests", "pause_on_resume_optional", null, null, Duration.ofSeconds(30L));
            String executionId = execution.getId();
            Flow flow = this.flowRepository.findByExecution(execution);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Execution restarted = this.executionService.resume(execution, (FlowInterface)flow, State.Type.RUNNING, Pause.Resumed.now());
            execution = runnerUtils.emitAndAwaitExecution(e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS, restarted);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
            Map outputs = (Map)((TaskRun)execution.findTaskRunsByTaskId("last").getFirst()).getOutputs().get((Object)"values");
            Assertions.assertThat(outputs.get("asked")).isEqualTo((Object)"MISSING");
        }

        public void runDurationWithBehavior(String tenantId, TestRunnerUtils runnerUtils, Pause.Behavior behavior) throws Exception {
            Execution execution = runnerUtils.runOneUntilPaused(tenantId, "io.kestra.tests", "pause-behavior", null, (unused, _unused) -> Map.of("behavior", behavior), Duration.ofSeconds(30L));
            String executionId = execution.getId();
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.PAUSED);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(1);
            execution = runnerUtils.awaitExecution(e -> e.getId().equals(executionId) && e.getState().getCurrent().isTerminated(), execution);
            State.Type finalState = behavior == Pause.Behavior.RESUME ? State.Type.SUCCESS : behavior.mapToState();
            boolean terminateAfterPause = behavior == Pause.Behavior.CANCEL || behavior == Pause.Behavior.FAIL;
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(terminateAfterPause ? 1 : 2);
            Assertions.assertThat((Comparable)((TaskRun)execution.getTaskRunList().getFirst()).getState().getCurrent()).isEqualTo((Object)finalState);
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)finalState);
        }

        public void shouldExecuteOnPauseTask(Execution execution) throws Exception {
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(2);
            Assertions.assertThat((String)((TaskRun)execution.getTaskRunList().getLast()).getTaskId()).isEqualTo("hello");
            Assertions.assertThat((Comparable)((TaskRun)execution.getTaskRunList().getLast()).getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        }

        public void shouldExecuteErrorsFinallyAndAfterExecution(Execution execution) throws Exception {
            Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.FAILED);
            Assertions.assertThat((List)execution.getTaskRunList()).hasSize(4);
            Assertions.assertThat((List)execution.findTaskRunsByTaskId("pause")).hasSize(1);
            Assertions.assertThat((Comparable)((TaskRun)execution.findTaskRunsByTaskId("pause").getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.FAILED);
            Assertions.assertThat((List)execution.findTaskRunsByTaskId("logError")).hasSize(1);
            Assertions.assertThat((Comparable)((TaskRun)execution.findTaskRunsByTaskId("logError").getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
            Assertions.assertThat((List)execution.findTaskRunsByTaskId("logFinally")).hasSize(1);
            Assertions.assertThat((Comparable)((TaskRun)execution.findTaskRunsByTaskId("logFinally").getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
            Assertions.assertThat((List)execution.findTaskRunsByTaskId("logAfter")).hasSize(1);
            Assertions.assertThat((Comparable)((TaskRun)execution.findTaskRunsByTaskId("logAfter").getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        }
    }
}

