/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.plugin.core.flow;

import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class ForEachItemCaseTest {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ForEachItemCaseTest.class);
    static final String TEST_NAMESPACE = "io.kestra.tests";
    @Inject
    private StorageInterface storageInterface;
    @Inject
    protected TestRunnerUtils runnerUtils;
    @Inject
    private FlowInputOutput flowIO;
    @Inject
    private ExecutionService executionService;
    @Inject
    private ExecutionRepositoryInterface executionRepository;

    public void forEachItem() throws TimeoutException, URISyntaxException, IOException, QueueException {
        URI file = this.storageUpload("main");
        Map<String, Integer> inputs = Map.of("file", file.toString(), "batch", 4);
        Execution execution = this.runnerUtils.runOne("main", TEST_NAMESPACE, "for-each-item", null, (flow, execution1) -> this.flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(30L));
        List triggeredExecs = this.runnerUtils.awaitFlowExecutionNumber(26, "main", TEST_NAMESPACE, "for-each-item-subflow");
        Execution firstTriggered = triggeredExecs.stream().filter(e -> e.getTrigger() != null && e.getTrigger().getVariables().get("taskRunIteration") != null).filter(e -> (Integer)e.getTrigger().getVariables().get("taskRunIteration") == 0).findFirst().orElse(null);
        Assertions.assertThat((Object)firstTriggered).isNotNull();
        Assertions.assertThat(firstTriggered.getTrigger().getVariables().get("taskRunIteration")).isEqualTo((Object)0);
        Assertions.assertThat((List)execution.getTaskRunList()).hasSize(4);
        Assertions.assertThat((List)((TaskRun)execution.getTaskRunList().get(2)).getAttempts()).hasSize(1);
        Assertions.assertThat((Comparable)((TaskRunAttempt)((TaskRun)execution.getTaskRunList().get(2)).getAttempts().getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Variables outputs = ((TaskRun)execution.getTaskRunList().get(2)).getOutputs();
        Assertions.assertThat(outputs.get("numberOfBatches")).isEqualTo((Object)26);
        Assertions.assertThat(outputs.get("iterations")).isNotNull();
        Map iterations = (Map)outputs.get("iterations");
        Assertions.assertThat((Integer)((Integer)iterations.get("CREATED"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("RUNNING"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("SUCCESS"))).isEqualTo(26);
        Execution triggered = (Execution)triggeredExecs.getLast();
        Assertions.assertThat((Comparable)triggered.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((String)triggered.getFlowId()).isEqualTo("for-each-item-subflow");
        Assertions.assertThat((String)((String)triggered.getInputs().get("items"))).matches((CharSequence)"kestra:///io/kestra/tests/for-each-item/executions/.*/tasks/each-split/.*\\.txt");
        Assertions.assertThat((List)triggered.getTaskRunList()).hasSize(1);
        Optional<Label> correlationId = triggered.getLabels().stream().filter(label -> label.key().equals("system.correlationId")).findAny();
        Assertions.assertThat((boolean)correlationId.isPresent()).isTrue();
        Assertions.assertThat((String)correlationId.get().value()).isEqualTo(execution.getId());
    }

    public void forEachItemEmptyItems(String tenantId) throws TimeoutException, URISyntaxException, IOException, QueueException {
        URI file = this.emptyItems(tenantId);
        Map<String, Integer> inputs = Map.of("file", file.toString(), "batch", 4);
        Execution execution = this.runnerUtils.runOne(tenantId, TEST_NAMESPACE, "for-each-item", null, (flow, execution1) -> this.flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(30L));
        Assertions.assertThat((List)execution.getTaskRunList()).hasSize(4);
        Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Variables outputs = ((TaskRun)execution.getTaskRunList().get(2)).getOutputs();
        Assertions.assertThat((Map)outputs).isNull();
    }

    public void forEachItemNoWait() throws TimeoutException, URISyntaxException, IOException, QueueException {
        URI file = this.storageUpload("main");
        Map<String, String> inputs = Map.of("file", file.toString());
        Execution execution = this.runnerUtils.runOne("main", TEST_NAMESPACE, "for-each-item-no-wait", null, (flow, execution1) -> this.flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(30L));
        ArrayListTotal subFlowExecs = this.executionRepository.findByFlowId("main", TEST_NAMESPACE, "for-each-item-subflow-sleep", Pageable.UNPAGED);
        Assertions.assertThat((int)subFlowExecs.size()).isLessThanOrEqualTo(26);
        Assertions.assertThat((List)execution.getTaskRunList()).hasSize(4);
        Assertions.assertThat((List)((TaskRun)execution.getTaskRunList().get(2)).getAttempts()).hasSize(1);
        Assertions.assertThat((Comparable)((TaskRunAttempt)((TaskRun)execution.getTaskRunList().get(2)).getAttempts().getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Variables outputs = ((TaskRun)execution.getTaskRunList().get(2)).getOutputs();
        Assertions.assertThat(outputs.get("numberOfBatches")).isEqualTo((Object)26);
        Assertions.assertThat(outputs.get("iterations")).isNotNull();
        Map iterations = (Map)outputs.get("iterations");
        Assertions.assertThat((Integer)((Integer)iterations.get("CREATED"))).isNull();
        Assertions.assertThat((Integer)((Integer)iterations.get("RUNNING"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("SUCCESS"))).isEqualTo(26);
        List triggeredExecs = this.runnerUtils.awaitFlowExecutionNumber(26, "main", TEST_NAMESPACE, "for-each-item-subflow-sleep");
        Execution triggered = (Execution)triggeredExecs.getLast();
        Assertions.assertThat((Comparable)triggered.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((String)triggered.getFlowId()).isEqualTo("for-each-item-subflow-sleep");
        Assertions.assertThat((String)((String)triggered.getInputs().get("items"))).matches((CharSequence)"kestra:///io/kestra/tests/for-each-item-no-wait/executions/.*/tasks/each-split/.*\\.txt");
        Assertions.assertThat((List)triggered.getTaskRunList()).hasSize(2);
    }

    public void forEachItemFailed() throws TimeoutException, URISyntaxException, IOException, QueueException {
        URI file = this.storageUpload("main");
        Map<String, String> inputs = Map.of("file", file.toString());
        Execution execution = this.runnerUtils.runOne("main", TEST_NAMESPACE, "for-each-item-failed", null, (flow, execution1) -> this.flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(60L));
        List triggeredExecs = this.runnerUtils.awaitFlowExecutionNumber(26, "main", TEST_NAMESPACE, "for-each-item-subflow-failed");
        Execution triggered = (Execution)triggeredExecs.getLast();
        Assertions.assertThat((List)execution.getTaskRunList()).hasSize(3);
        Assertions.assertThat((List)((TaskRun)execution.getTaskRunList().get(2)).getAttempts()).hasSize(1);
        Assertions.assertThat((Comparable)((TaskRunAttempt)((TaskRun)execution.getTaskRunList().get(2)).getAttempts().getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.FAILED);
        Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.FAILED);
        Variables outputs = ((TaskRun)execution.getTaskRunList().get(2)).getOutputs();
        Assertions.assertThat(outputs.get("numberOfBatches")).isEqualTo((Object)26);
        Assertions.assertThat(outputs.get("iterations")).isNotNull();
        Map iterations = (Map)outputs.get("iterations");
        Assertions.assertThat((Integer)((Integer)iterations.get("CREATED"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("RUNNING"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("FAILED"))).isEqualTo(26);
        Assertions.assertThat((Comparable)triggered.getState().getCurrent()).isEqualTo((Object)State.Type.FAILED);
        Assertions.assertThat((String)triggered.getFlowId()).isEqualTo("for-each-item-subflow-failed");
        Assertions.assertThat((String)((String)triggered.getInputs().get("items"))).matches((CharSequence)"kestra:///io/kestra/tests/for-each-item-failed/executions/.*/tasks/each-split/.*\\.txt");
        Assertions.assertThat((List)triggered.getTaskRunList()).hasSize(1);
    }

    public void forEachItemWithSubflowOutputs() throws TimeoutException, URISyntaxException, IOException, QueueException {
        URI file = this.storageUpload("main");
        Map<String, String> inputs = Map.of("file", file.toString());
        Execution execution = this.runnerUtils.runOne("main", TEST_NAMESPACE, "for-each-item-outputs", null, (flow, execution1) -> this.flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(30L));
        List triggeredExecs = this.runnerUtils.awaitFlowExecutionNumber(26, "main", TEST_NAMESPACE, "for-each-item-outputs-subflow");
        Execution triggered = (Execution)triggeredExecs.getLast();
        Assertions.assertThat((List)execution.getTaskRunList()).hasSize(5);
        Assertions.assertThat((List)((TaskRun)execution.getTaskRunList().get(2)).getAttempts()).hasSize(1);
        Assertions.assertThat((Comparable)((TaskRunAttempt)((TaskRun)execution.getTaskRunList().get(2)).getAttempts().getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Variables outputs = ((TaskRun)execution.getTaskRunList().get(2)).getOutputs();
        Assertions.assertThat(outputs.get("numberOfBatches")).isEqualTo((Object)26);
        Assertions.assertThat(outputs.get("iterations")).isNotNull();
        Map iterations = (Map)outputs.get("iterations");
        Assertions.assertThat((Integer)((Integer)iterations.get("CREATED"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("RUNNING"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("SUCCESS"))).isEqualTo(26);
        Assertions.assertThat((Comparable)triggered.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((String)triggered.getFlowId()).isEqualTo("for-each-item-outputs-subflow");
        Assertions.assertThat((String)((String)triggered.getInputs().get("items"))).matches((CharSequence)"kestra:///io/kestra/tests/for-each-item-outputs/executions/.*/tasks/each-split/.*\\.txt");
        Assertions.assertThat((List)triggered.getTaskRunList()).hasSize(1);
        Variables mergeTaskOutputs = ((TaskRun)execution.getTaskRunList().get(3)).getOutputs();
        Assertions.assertThat(mergeTaskOutputs.get("subflowOutputs")).isNotNull();
        InputStream stream = this.storageInterface.get("main", execution.getNamespace(), URI.create((String)mergeTaskOutputs.get("subflowOutputs")));
        try (BufferedReader br = new BufferedReader(new InputStreamReader(stream));){
            Assertions.assertThat((long)br.lines().count()).isEqualTo(26L);
        }
    }

    public void restartForEachItem(String tenantId) throws Exception {
        URI file = this.storageUpload(tenantId);
        Map<String, Integer> inputs = Map.of("file", file.toString(), "batch", 20);
        Execution failedExecution = this.runnerUtils.runOne(tenantId, TEST_NAMESPACE, "restart-for-each-item", null, (flow, execution1) -> this.flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(30L));
        Assertions.assertThat((List)failedExecution.getTaskRunList()).hasSize(3);
        Assertions.assertThat((Comparable)failedExecution.getState().getCurrent()).isEqualTo((Object)State.Type.FAILED);
        List triggeredExecs = this.runnerUtils.awaitFlowExecutionNumber(6, tenantId, TEST_NAMESPACE, "restart-child");
        Assertions.assertThat((List)triggeredExecs).extracting(e -> e.getState().getCurrent()).containsOnly((Object[])new State.Type[]{State.Type.FAILED});
        Execution restarted = this.executionService.restart(failedExecution, null);
        Execution successExecution = this.runnerUtils.restartExecution(e -> e.getState().getCurrent() == State.Type.SUCCESS && e.getFlowId().equals("restart-for-each-item"), restarted);
        Assertions.assertThat((List)successExecution.getTaskRunList()).hasSize(4);
        triggeredExecs = this.runnerUtils.awaitFlowExecutionNumber(6, tenantId, TEST_NAMESPACE, "restart-child");
        Assertions.assertThat((List)triggeredExecs).extracting(e -> e.getState().getCurrent()).containsOnly((Object[])new State.Type[]{State.Type.SUCCESS});
    }

    public void forEachItemInIf(String tenantId) throws TimeoutException, URISyntaxException, IOException, QueueException {
        URI file = this.storageUpload(tenantId);
        Map<String, Integer> inputs = Map.of("file", file.toString(), "batch", 4);
        Execution execution = this.runnerUtils.runOne(tenantId, TEST_NAMESPACE, "for-each-item-in-if", null, (flow, execution1) -> this.flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(30L));
        List triggeredExecs = this.runnerUtils.awaitFlowExecutionNumber(26, tenantId, TEST_NAMESPACE, "for-each-item-subflow");
        Execution triggered = (Execution)triggeredExecs.getLast();
        Assertions.assertThat((List)execution.getTaskRunList()).hasSize(5);
        Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Variables outputs = ((TaskRun)execution.getTaskRunList().get(3)).getOutputs();
        Assertions.assertThat(outputs.get("numberOfBatches")).isEqualTo((Object)26);
        Assertions.assertThat(outputs.get("iterations")).isNotNull();
        Map iterations = (Map)outputs.get("iterations");
        Assertions.assertThat((Integer)((Integer)iterations.get("CREATED"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("RUNNING"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("SUCCESS"))).isEqualTo(26);
        Assertions.assertThat((Comparable)triggered.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((String)triggered.getFlowId()).isEqualTo("for-each-item-subflow");
        Assertions.assertThat((String)((String)triggered.getInputs().get("items"))).matches((CharSequence)"kestra:///io/kestra/tests/for-each-item-in-if/executions/.*/tasks/each-split/.*\\.txt");
        Assertions.assertThat((List)triggered.getTaskRunList()).hasSize(1);
        Optional<Label> correlationId = triggered.getLabels().stream().filter(label -> label.key().equals("system.correlationId")).findAny();
        Assertions.assertThat((boolean)correlationId.isPresent()).isTrue();
        Assertions.assertThat((String)correlationId.get().value()).isEqualTo(execution.getId());
    }

    public void forEachItemWithAfterExecution() throws TimeoutException, URISyntaxException, IOException, QueueException {
        URI file = this.storageUpload("main");
        Map<String, Integer> inputs = Map.of("file", file.toString(), "batch", 4);
        Execution execution = this.runnerUtils.runOne("main", TEST_NAMESPACE, "for-each-item-after-execution", null, (flow, execution1) -> this.flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(30L));
        List triggeredExecs = this.runnerUtils.awaitFlowExecutionNumber(26, "main", TEST_NAMESPACE, "for-each-item-subflow-after-execution");
        Execution triggered = (Execution)triggeredExecs.getLast();
        Assertions.assertThat((List)execution.getTaskRunList()).hasSize(5);
        Assertions.assertThat((List)((TaskRun)execution.getTaskRunList().get(2)).getAttempts()).hasSize(1);
        Assertions.assertThat((Comparable)((TaskRunAttempt)((TaskRun)execution.getTaskRunList().get(2)).getAttempts().getFirst()).getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((Comparable)execution.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Variables outputs = ((TaskRun)execution.getTaskRunList().get(2)).getOutputs();
        Assertions.assertThat(outputs.get("numberOfBatches")).isEqualTo((Object)26);
        Assertions.assertThat(outputs.get("iterations")).isNotNull();
        Map iterations = (Map)outputs.get("iterations");
        Assertions.assertThat((Integer)((Integer)iterations.get("CREATED"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("RUNNING"))).isZero();
        Assertions.assertThat((Integer)((Integer)iterations.get("SUCCESS"))).isEqualTo(26);
        Assertions.assertThat((Comparable)triggered.getState().getCurrent()).isEqualTo((Object)State.Type.SUCCESS);
        Assertions.assertThat((String)triggered.getFlowId()).isEqualTo("for-each-item-subflow-after-execution");
        Assertions.assertThat((String)((String)triggered.getInputs().get("items"))).matches((CharSequence)"kestra:///io/kestra/tests/for-each-item-after-execution/executions/.*/tasks/each-split/.*\\.txt");
        Assertions.assertThat((List)triggered.getTaskRunList()).hasSize(2);
        Optional<Label> correlationId = triggered.getLabels().stream().filter(label -> label.key().equals("system.correlationId")).findAny();
        Assertions.assertThat((boolean)correlationId.isPresent()).isTrue();
        Assertions.assertThat((String)correlationId.get().value()).isEqualTo(execution.getId());
    }

    private URI storageUpload(String tenantId) throws URISyntaxException, IOException {
        File tempFile = File.createTempFile("file", ".txt");
        Files.write(tempFile.toPath(), this.content(), new OpenOption[0]);
        return this.storageInterface.put(tenantId, null, new URI("/file/storage/file.txt"), (InputStream)new FileInputStream(tempFile));
    }

    private URI emptyItems(String tenantId) throws URISyntaxException, IOException {
        File tempFile = File.createTempFile("file", ".txt");
        return this.storageInterface.put(tenantId, null, new URI("/file/storage/file.txt"), (InputStream)new FileInputStream(tempFile));
    }

    private List<String> content() {
        return IntStream.range(0, 102).mapToObj(value -> StringUtils.leftPad((String)("" + value), (int)20)).toList();
    }
}

