/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.plugin.core.flow;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.StorageService;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageSplitInterface;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.Rethrow;
import io.kestra.core.validations.NoSystemLabelValidation;
import io.kestra.plugin.core.flow.ChildFlowInterface;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import java.beans.ConstructorProperties;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URI;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;

@Schema(title="Execute a subflow for each batch of items", description="The `items` value must be Kestra's internal storage URI (e.g. an output file from a previous task, or a file from inputs of FILE type).\nTwo special variables are available to pass as inputs to the subflow:\n- `taskrun.items` which is the URI of internal storage file containing the batch of items to process\n- `taskrun.iteration` which is the iteration or batch number\n\nRestarting a parent flow will restart any subflows that has previously been executed.")
@Plugin(examples={@Example(title="Execute a subflow for each batch of items. The subflow `orders` is called from the parent flow `orders_parallel` using the `ForEachItem` task in order to start one subflow execution for each batch of items.\n```yaml\nid: orders\nnamespace: company.team\n\ninputs:\n  - id: order\n    type: STRING\n\ntasks:\n  - id: read_file\n    type: io.kestra.plugin.scripts.shell.Commands\n    taskRunner:\n      type: io.kestra.plugin.core.runner.Process\n    commands:\n      - cat \"{{ inputs.order }}\"\n\n  - id: read_file_content\n    type: io.kestra.plugin.core.log.Log\n    message: \"{{ read(inputs.order) }}\"\n```\n", full=true, code={"id: orders_parallel\nnamespace: company.team\n\ntasks:\n  - id: extract\n    type: io.kestra.plugin.jdbc.duckdb.Query\n    sql: |\n      INSTALL httpfs;\n      LOAD httpfs;\n      SELECT *\n      FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);\n    store: true\n\n  - id: each\n    type: io.kestra.plugin.core.flow.ForEachItem\n    items: \"{{ outputs.extract.uri }}\"\n    batch:\n      rows: 1\n    namespace: company.team\n    flowId: orders\n    wait: true # wait for the subflow execution\n    transmitFailed: true # fail the task run if the subflow execution fails\n    inputs:\n      order: \"{{ taskrun.items }}\" # special variable that contains the items of the batch\n"}), @Example(title="Execute a subflow for each JSON item fetched from a REST API. The subflow `mysubflow` is called from the parent flow `iterate_over_json` using the `ForEachItem` task; this creates one subflow execution for each JSON object.\n\nNote how we first need to convert the JSON array to JSON-L format using the `JsonWriter` task. This is because the `items` attribute of the `ForEachItem` task expects a file where each line represents a single item. Suitable file types include Amazon ION (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For other formats, you can use the conversion tasks available in the `io.kestra.plugin.serdes` module.\n\nIn this example, the subflow `mysubflow` expects a JSON object as input. The `JsonReader` task first reads the JSON array from the REST API and converts it to ION. Then, the `JsonWriter` task converts that ION file to JSON-L format, suitable for the `ForEachItem` task.\n\n```yaml\nid: mysubflow\nnamespace: company.team\n\ninputs:\n  - id: json\n    type: JSON\n\ntasks:\n  - id: debug\n    type: io.kestra.plugin.core.log.Log\n    message: \"{{ inputs.json }}\"\n```\n", full=true, code={"id: iterate_over_json\nnamespace: company.team\n\ntasks:\n  - id: download\n    type: io.kestra.plugin.core.http.Download\n    uri: \"https://api.restful-api.dev/objects\"\n    contentType: application/json\n    method: GET\n    failOnEmptyResponse: true\n    timeout: PT15S\n\n  - id: json_to_ion\n    type: io.kestra.plugin.serdes.json.JsonToIon\n    from: \"{{ outputs.download.uri }}\"\n    newLine: false # regular json\n\n  - id: ion_to_jsonl\n    type: io.kestra.plugin.serdes.json.IonToJson\n    from: \"{{ outputs.json_to_ion.uri }}\"\n    newLine: true # JSON-L\n\n  - id: for_each_item\n    type: io.kestra.plugin.core.flow.ForEachItem\n    items: \"{{ outputs.ion_to_jsonl.uri }}\"\n    batch:\n      rows: 1\n    namespace: company.team\n    flowId: mysubflow\n    wait: true\n    transmitFailed: true\n    inputs:\n      json: \"{{ json(read(taskrun.items)) }}\"\n"}), @Example(title="This example shows how to use the combination of `ForEach` and `ForEachItem` tasks to process files from an S3 bucket. The `ForEach` iterates over files from the S3 trigger, and the `ForEachItem` task is used to split each file into batches. The `process_batch` subflow is then called with the `data` input parameter set to the URI of the batch to process.\n\n```yaml\nid: process_batch\nnamespace: company.team\n\ninputs:\n  - id: data\n    type: FILE\n\ntasks:\n  - id: debug\n    type: io.kestra.plugin.core.log.Log\n    message: \"{{ read(inputs.data) }}\"\n```\n", full=true, code={"id: process_files\nnamespace: company.team\n\ntasks:\n  - id: loop_over_files\n    type: io.kestra.plugin.core.flow.ForEach\n    values: \"{{ trigger.objects | jq('.[].uri') }}\"\n    tasks:\n      - id: subflow_per_batch\n        type: io.kestra.plugin.core.flow.ForEachItem\n        items: \"{{ trigger.uris[parent.taskrun.value] }}\"\n        batch:\n          rows: 1\n        flowId: process_batch\n        namespace: company.team\n        wait: true\n        transmitFailed: true\n        inputs:\n          data: \"{{ taskrun.items }}\"\n\ntriggers:\n  - id: s3\n    type: io.kestra.plugin.aws.s3.Trigger\n    interval: \"PT1S\"\n    accessKeyId: \"<access-key>\"\n    secretKeyId: \"<secret-key>\"\n    region: \"us-east-1\"\n    bucket: \"my_bucket\"\n    prefix: \"sub-dir\"\n    action: NONE\n"})}, aliases={"io.kestra.core.tasks.flows.ForEachItem"})
public class ForEachItem
extends Task
implements FlowableTask<VoidOutput>,
ChildFlowInterface {
    @NotEmpty
    @PluginProperty(dynamic=true)
    @Schema(title="The items to be split into batches and processed \u2013 make sure to set it to Kestra's internal storage URI. This can be either the output from a previous task, formatted as `{{ outputs.task_id.uri }}`, or a FILE type input parameter, like `{{ inputs.myfile }}`. This task is optimized for files where each line represents a single item. Suitable file types include Amazon ION-type files (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For files in other formats such as Excel, CSV, Avro, Parquet, XML, or JSON, it's recommended to first convert them to the ION format. This can be done using the conversion tasks available in the `io.kestra.plugin.serdes` module, which will transform files from their original format to ION.")
    private String items;
    @NotNull
    @PluginProperty
    @Schema(title="How to split the items into batches")
    private Batch batch;
    @NotEmpty
    @Schema(title="The namespace of the subflow to be executed")
    @PluginProperty(dynamic=true)
    private String namespace;
    @NotEmpty
    @Schema(title="The identifier of the subflow to be executed")
    @PluginProperty(dynamic=true)
    private String flowId;
    @Schema(title="The revision of the subflow to be executed", description="By default, the last, i.e. the most recent, revision of the subflow is executed.")
    @PluginProperty
    private Integer revision;
    @Schema(title="The inputs to pass to the subflow to be executed")
    @PluginProperty(dynamic=true)
    private Map<String, Object> inputs;
    @Schema(title="The labels to pass to the subflow to be executed", implementation=Object.class, oneOf={List.class, Map.class})
    @PluginProperty(dynamic=true)
    @JsonSerialize(using=ListOrMapOfLabelSerializer.class)
    @JsonDeserialize(using=ListOrMapOfLabelDeserializer.class)
    private List<@NoSystemLabelValidation Label> labels;
    @Schema(title="Flag specifying whether to wait for the subflows execution to finish before continuing the current execution.")
    @PluginProperty
    private final Boolean wait;
    @Schema(title="Flag specifying whether to fail the current execution if the subflow execution fails or is killed.", description="Note that this option only works if `wait` is set to `true`.")
    @PluginProperty
    private final Boolean transmitFailed;
    @Schema(title="Flag specifying whether the subflow should inherit labels from the parent execution that triggered it.", description="By default, labels are not passed to the subflow execution. If you set this option to `true`, the child flow execution will inherit all labels from the parent execution.")
    @PluginProperty
    private final Boolean inheritLabels;
    @Schema(title="Don't trigger the subflow now but schedule it on a specific date.")
    @PluginProperty
    private Property<ZonedDateTime> scheduleDate;
    @Valid
    private List<Task> errors;
    @Valid
    @JsonProperty(value="finally")
    protected List<Task> _finally;
    @Schema(title="What action to take when a failed execution is restarting", description="- RETRY_FAILED (default): will restart the each subflow executions that are failed.\n- NEW_EXECUTION: will create a new subflow execution for each batch of items.\"\"\n")
    @NotNull
    private ExecutableTask.RestartBehavior restartBehavior;

    @Override
    public List<Task> getFinally() {
        return this._finally;
    }

    @Override
    public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
        GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.SEQUENTIAL);
        GraphUtils.sequential(subGraph, this.getTasks(), this.errors, this._finally, taskRun, execution);
        return subGraph;
    }

    @Override
    public List<Task> allChildTasks() {
        return Stream.concat(this.getTasks() != null ? this.getTasks().stream() : Stream.empty(), Stream.concat(this.errors != null ? this.errors.stream() : Stream.empty(), this._finally != null ? this._finally.stream() : Stream.empty())).toList();
    }

    @Override
    public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
        return FlowableUtils.resolveTasks(this.getTasks(), parentTaskRun);
    }

    @Override
    public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
        return FlowableUtils.resolveSequentialNexts(execution, this.childTasks(runContext, parentTaskRun), FlowableUtils.resolveTasks(this.errors, parentTaskRun), FlowableUtils.resolveTasks(this._finally, parentTaskRun), parentTaskRun);
    }

    public List<Task> getTasks() {
        return List.of(new ForEachItemSplit(this.getId(), this.items, this.batch), new ForEachItemExecutable(this.getId(), this.inputs, this.inheritLabels, this.labels, this.wait, this.transmitFailed, this.scheduleDate, new ExecutableTask.SubflowId(this.namespace, this.flowId, Optional.ofNullable(this.revision)), this.restartBehavior), new ForEachItemMergeOutputs(this.getId()));
    }

    public void setTasks(List<Task> tasks) {
    }

    private static Map<String, Object> extractOutput(RunContext runContext, String taskId) {
        Map parent;
        Map outputVariables = (Map)runContext.getVariables().get("outputs");
        Map splitTaskOutput = (Map)outputVariables.get(taskId);
        if (runContext.getVariables().containsKey("parent") && (parent = (Map)runContext.getVariables().get("parent")).containsKey("taskrun")) {
            String value = (String)((Map)parent.get("taskrun")).get("value");
            splitTaskOutput = (Map)splitTaskOutput.get(value);
        }
        return splitTaskOutput;
    }

    @Generated
    private static Batch $default$batch() {
        return Batch.builder().build();
    }

    @Generated
    private static Boolean $default$wait() {
        return true;
    }

    @Generated
    private static Boolean $default$transmitFailed() {
        return true;
    }

    @Generated
    private static Boolean $default$inheritLabels() {
        return false;
    }

    @Generated
    private static ExecutableTask.RestartBehavior $default$restartBehavior() {
        return ExecutableTask.RestartBehavior.RETRY_FAILED;
    }

    @Generated
    protected ForEachItem(ForEachItemBuilder<?, ?> b) {
        super(b);
        this.items = b.items;
        this.batch = b.batch$set ? b.batch$value : ForEachItem.$default$batch();
        this.namespace = b.namespace;
        this.flowId = b.flowId;
        this.revision = b.revision;
        this.inputs = b.inputs;
        this.labels = b.labels;
        this.wait = b.wait$set ? b.wait$value : ForEachItem.$default$wait();
        this.transmitFailed = b.transmitFailed$set ? b.transmitFailed$value : ForEachItem.$default$transmitFailed();
        this.inheritLabels = b.inheritLabels$set ? b.inheritLabels$value : ForEachItem.$default$inheritLabels();
        this.scheduleDate = b.scheduleDate;
        this.errors = b.errors;
        this._finally = b._finally;
        this.restartBehavior = b.restartBehavior$set ? b.restartBehavior$value : ForEachItem.$default$restartBehavior();
    }

    @Generated
    public static ForEachItemBuilder<?, ?> builder() {
        return new ForEachItemBuilderImpl();
    }

    @Generated
    public ForEachItemBuilder<?, ?> toBuilder() {
        return new ForEachItemBuilderImpl().$fillValuesFrom(this);
    }

    @Generated
    public String toString() {
        return "ForEachItem(super=" + super.toString() + ", items=" + this.getItems() + ", batch=" + String.valueOf(this.getBatch()) + ", namespace=" + this.getNamespace() + ", flowId=" + this.getFlowId() + ", revision=" + this.getRevision() + ", inputs=" + String.valueOf(this.getInputs()) + ", labels=" + String.valueOf(this.getLabels()) + ", wait=" + this.getWait() + ", transmitFailed=" + this.getTransmitFailed() + ", inheritLabels=" + this.getInheritLabels() + ", scheduleDate=" + String.valueOf(this.getScheduleDate()) + ", errors=" + String.valueOf(this.getErrors()) + ", _finally=" + String.valueOf(this._finally) + ", restartBehavior=" + String.valueOf((Object)this.getRestartBehavior()) + ")";
    }

    @Generated
    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof ForEachItem)) {
            return false;
        }
        ForEachItem other = (ForEachItem)o;
        if (!other.canEqual(this)) {
            return false;
        }
        if (!super.equals(o)) {
            return false;
        }
        Integer this$revision = this.getRevision();
        Integer other$revision = other.getRevision();
        if (this$revision == null ? other$revision != null : !((Object)this$revision).equals(other$revision)) {
            return false;
        }
        Boolean this$wait = this.getWait();
        Boolean other$wait = other.getWait();
        if (this$wait == null ? other$wait != null : !((Object)this$wait).equals(other$wait)) {
            return false;
        }
        Boolean this$transmitFailed = this.getTransmitFailed();
        Boolean other$transmitFailed = other.getTransmitFailed();
        if (this$transmitFailed == null ? other$transmitFailed != null : !((Object)this$transmitFailed).equals(other$transmitFailed)) {
            return false;
        }
        Boolean this$inheritLabels = this.getInheritLabels();
        Boolean other$inheritLabels = other.getInheritLabels();
        if (this$inheritLabels == null ? other$inheritLabels != null : !((Object)this$inheritLabels).equals(other$inheritLabels)) {
            return false;
        }
        String this$items = this.getItems();
        String other$items = other.getItems();
        if (this$items == null ? other$items != null : !this$items.equals(other$items)) {
            return false;
        }
        Batch this$batch = this.getBatch();
        Batch other$batch = other.getBatch();
        if (this$batch == null ? other$batch != null : !((Object)this$batch).equals(other$batch)) {
            return false;
        }
        String this$namespace = this.getNamespace();
        String other$namespace = other.getNamespace();
        if (this$namespace == null ? other$namespace != null : !this$namespace.equals(other$namespace)) {
            return false;
        }
        String this$flowId = this.getFlowId();
        String other$flowId = other.getFlowId();
        if (this$flowId == null ? other$flowId != null : !this$flowId.equals(other$flowId)) {
            return false;
        }
        Map<String, Object> this$inputs = this.getInputs();
        Map<String, Object> other$inputs = other.getInputs();
        if (this$inputs == null ? other$inputs != null : !((Object)this$inputs).equals(other$inputs)) {
            return false;
        }
        List<Label> this$labels = this.getLabels();
        List<Label> other$labels = other.getLabels();
        if (this$labels == null ? other$labels != null : !((Object)this$labels).equals(other$labels)) {
            return false;
        }
        Property<ZonedDateTime> this$scheduleDate = this.getScheduleDate();
        Property<ZonedDateTime> other$scheduleDate = other.getScheduleDate();
        if (this$scheduleDate == null ? other$scheduleDate != null : !((Object)this$scheduleDate).equals(other$scheduleDate)) {
            return false;
        }
        List<Task> this$errors = this.getErrors();
        List<Task> other$errors = other.getErrors();
        if (this$errors == null ? other$errors != null : !((Object)this$errors).equals(other$errors)) {
            return false;
        }
        List<Task> this$_finally = this._finally;
        List<Task> other$_finally = other._finally;
        if (this$_finally == null ? other$_finally != null : !((Object)this$_finally).equals(other$_finally)) {
            return false;
        }
        ExecutableTask.RestartBehavior this$restartBehavior = this.getRestartBehavior();
        ExecutableTask.RestartBehavior other$restartBehavior = other.getRestartBehavior();
        return !(this$restartBehavior == null ? other$restartBehavior != null : !((Object)((Object)this$restartBehavior)).equals((Object)other$restartBehavior));
    }

    @Generated
    protected boolean canEqual(Object other) {
        return other instanceof ForEachItem;
    }

    @Generated
    public int hashCode() {
        int PRIME = 59;
        int result = super.hashCode();
        Integer $revision = this.getRevision();
        result = result * 59 + ($revision == null ? 43 : ((Object)$revision).hashCode());
        Boolean $wait = this.getWait();
        result = result * 59 + ($wait == null ? 43 : ((Object)$wait).hashCode());
        Boolean $transmitFailed = this.getTransmitFailed();
        result = result * 59 + ($transmitFailed == null ? 43 : ((Object)$transmitFailed).hashCode());
        Boolean $inheritLabels = this.getInheritLabels();
        result = result * 59 + ($inheritLabels == null ? 43 : ((Object)$inheritLabels).hashCode());
        String $items = this.getItems();
        result = result * 59 + ($items == null ? 43 : $items.hashCode());
        Batch $batch = this.getBatch();
        result = result * 59 + ($batch == null ? 43 : ((Object)$batch).hashCode());
        String $namespace = this.getNamespace();
        result = result * 59 + ($namespace == null ? 43 : $namespace.hashCode());
        String $flowId = this.getFlowId();
        result = result * 59 + ($flowId == null ? 43 : $flowId.hashCode());
        Map<String, Object> $inputs = this.getInputs();
        result = result * 59 + ($inputs == null ? 43 : ((Object)$inputs).hashCode());
        List<Label> $labels = this.getLabels();
        result = result * 59 + ($labels == null ? 43 : ((Object)$labels).hashCode());
        Property<ZonedDateTime> $scheduleDate = this.getScheduleDate();
        result = result * 59 + ($scheduleDate == null ? 43 : ((Object)$scheduleDate).hashCode());
        List<Task> $errors = this.getErrors();
        result = result * 59 + ($errors == null ? 43 : ((Object)$errors).hashCode());
        List<Task> $_finally = this._finally;
        result = result * 59 + ($_finally == null ? 43 : ((Object)$_finally).hashCode());
        ExecutableTask.RestartBehavior $restartBehavior = this.getRestartBehavior();
        result = result * 59 + ($restartBehavior == null ? 43 : ((Object)((Object)$restartBehavior)).hashCode());
        return result;
    }

    @Generated
    public String getItems() {
        return this.items;
    }

    @Generated
    public Batch getBatch() {
        return this.batch;
    }

    @Override
    @Generated
    public String getNamespace() {
        return this.namespace;
    }

    @Override
    @Generated
    public String getFlowId() {
        return this.flowId;
    }

    @Generated
    public Integer getRevision() {
        return this.revision;
    }

    @Generated
    public Map<String, Object> getInputs() {
        return this.inputs;
    }

    @Generated
    public List<@NoSystemLabelValidation Label> getLabels() {
        return this.labels;
    }

    @Generated
    public Boolean getWait() {
        return this.wait;
    }

    @Generated
    public Boolean getTransmitFailed() {
        return this.transmitFailed;
    }

    @Generated
    public Boolean getInheritLabels() {
        return this.inheritLabels;
    }

    @Generated
    public Property<ZonedDateTime> getScheduleDate() {
        return this.scheduleDate;
    }

    @Override
    @Generated
    public List<Task> getErrors() {
        return this.errors;
    }

    @Generated
    public ExecutableTask.RestartBehavior getRestartBehavior() {
        return this.restartBehavior;
    }

    @Generated
    public ForEachItem() {
        this.batch = ForEachItem.$default$batch();
        this.wait = ForEachItem.$default$wait();
        this.transmitFailed = ForEachItem.$default$transmitFailed();
        this.inheritLabels = ForEachItem.$default$inheritLabels();
        this.restartBehavior = ForEachItem.$default$restartBehavior();
    }

    @Plugin(internal=true)
    public static class ForEachItemSplit
    extends Task
    implements RunnableTask<Output> {
        static final String SUFFIX = "_split";
        private String items;
        private Batch batch;

        private ForEachItemSplit(String parentId, String items, Batch batch) {
            this.items = items;
            this.batch = batch;
            this.id = parentId + SUFFIX;
            this.type = ForEachItemSplit.class.getName();
        }

        @Override
        public Output run(RunContext runContext) throws Exception {
            String renderedUri = runContext.render(this.items);
            if (!renderedUri.startsWith("kestra://")) {
                String errorMessage = "Unable to split the items from " + renderedUri + ", this is not an internal storage URI!";
                runContext.logger().error(errorMessage);
                throw new IllegalArgumentException(errorMessage);
            }
            List<URI> splits = StorageService.split(runContext, this.batch, URI.create(renderedUri));
            String fileContent = splits.stream().map(uri -> uri.toString()).collect(Collectors.joining(System.lineSeparator()));
            try (ByteArrayInputStream bis = new ByteArrayInputStream(fileContent.getBytes());){
                URI splitsFile = runContext.storage().putFile((InputStream)bis, "splits.txt");
                Output output = Output.builder().splits(splitsFile).build();
                return output;
            }
        }

        @Generated
        public String getItems() {
            return this.items;
        }

        @Generated
        public Batch getBatch() {
            return this.batch;
        }

        @Generated
        public ForEachItemSplit() {
        }

        public static class Output
        implements io.kestra.core.models.tasks.Output {
            private URI splits;

            @ConstructorProperties(value={"splits"})
            @Generated
            Output(URI splits) {
                this.splits = splits;
            }

            @Generated
            public static OutputBuilder builder() {
                return new OutputBuilder();
            }

            @Generated
            public URI getSplits() {
                return this.splits;
            }

            @Generated
            public static class OutputBuilder {
                @Generated
                private URI splits;

                @Generated
                OutputBuilder() {
                }

                @Generated
                public OutputBuilder splits(URI splits) {
                    this.splits = splits;
                    return this;
                }

                @Generated
                public Output build() {
                    return new Output(this.splits);
                }

                @Generated
                public String toString() {
                    return "ForEachItem.ForEachItemSplit.Output.OutputBuilder(splits=" + String.valueOf(this.splits) + ")";
                }
            }
        }
    }

    public static class Batch
    implements StorageSplitInterface {
        private Property<String> bytes;
        private Property<Integer> partitions;
        private Property<Integer> rows;
        private Property<String> separator;
        private Property<String> regexPattern;

        @Generated
        private static Property<Integer> $default$rows() {
            return Property.ofValue(1);
        }

        @Generated
        private static Property<String> $default$separator() {
            return Property.ofValue("\n");
        }

        @Generated
        protected Batch(BatchBuilder<?, ?> b) {
            this.bytes = b.bytes;
            this.partitions = b.partitions;
            this.rows = b.rows$set ? b.rows$value : Batch.$default$rows();
            this.separator = b.separator$set ? b.separator$value : Batch.$default$separator();
            this.regexPattern = b.regexPattern;
        }

        @Generated
        public static BatchBuilder<?, ?> builder() {
            return new BatchBuilderImpl();
        }

        @Generated
        public String toString() {
            return "ForEachItem.Batch(bytes=" + String.valueOf(this.getBytes()) + ", partitions=" + String.valueOf(this.getPartitions()) + ", rows=" + String.valueOf(this.getRows()) + ", separator=" + String.valueOf(this.getSeparator()) + ", regexPattern=" + String.valueOf(this.getRegexPattern()) + ")";
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Batch)) {
                return false;
            }
            Batch other = (Batch)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Property<String> this$bytes = this.getBytes();
            Property<String> other$bytes = other.getBytes();
            if (this$bytes == null ? other$bytes != null : !((Object)this$bytes).equals(other$bytes)) {
                return false;
            }
            Property<Integer> this$partitions = this.getPartitions();
            Property<Integer> other$partitions = other.getPartitions();
            if (this$partitions == null ? other$partitions != null : !((Object)this$partitions).equals(other$partitions)) {
                return false;
            }
            Property<Integer> this$rows = this.getRows();
            Property<Integer> other$rows = other.getRows();
            if (this$rows == null ? other$rows != null : !((Object)this$rows).equals(other$rows)) {
                return false;
            }
            Property<String> this$separator = this.getSeparator();
            Property<String> other$separator = other.getSeparator();
            if (this$separator == null ? other$separator != null : !((Object)this$separator).equals(other$separator)) {
                return false;
            }
            Property<String> this$regexPattern = this.getRegexPattern();
            Property<String> other$regexPattern = other.getRegexPattern();
            return !(this$regexPattern == null ? other$regexPattern != null : !((Object)this$regexPattern).equals(other$regexPattern));
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof Batch;
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Property<String> $bytes = this.getBytes();
            result = result * 59 + ($bytes == null ? 43 : ((Object)$bytes).hashCode());
            Property<Integer> $partitions = this.getPartitions();
            result = result * 59 + ($partitions == null ? 43 : ((Object)$partitions).hashCode());
            Property<Integer> $rows = this.getRows();
            result = result * 59 + ($rows == null ? 43 : ((Object)$rows).hashCode());
            Property<String> $separator = this.getSeparator();
            result = result * 59 + ($separator == null ? 43 : ((Object)$separator).hashCode());
            Property<String> $regexPattern = this.getRegexPattern();
            result = result * 59 + ($regexPattern == null ? 43 : ((Object)$regexPattern).hashCode());
            return result;
        }

        @Override
        @Generated
        public Property<String> getBytes() {
            return this.bytes;
        }

        @Override
        @Generated
        public Property<Integer> getPartitions() {
            return this.partitions;
        }

        @Override
        @Generated
        public Property<Integer> getRows() {
            return this.rows;
        }

        @Override
        @Generated
        public Property<String> getSeparator() {
            return this.separator;
        }

        @Override
        @Generated
        public Property<String> getRegexPattern() {
            return this.regexPattern;
        }

        @Generated
        public Batch() {
            this.rows = Batch.$default$rows();
            this.separator = Batch.$default$separator();
        }

        @Generated
        public static abstract class BatchBuilder<C extends Batch, B extends BatchBuilder<C, B>> {
            @Generated
            private Property<String> bytes;
            @Generated
            private Property<Integer> partitions;
            @Generated
            private boolean rows$set;
            @Generated
            private Property<Integer> rows$value;
            @Generated
            private boolean separator$set;
            @Generated
            private Property<String> separator$value;
            @Generated
            private Property<String> regexPattern;

            @Generated
            public B bytes(Property<String> bytes) {
                this.bytes = bytes;
                return this.self();
            }

            @Generated
            public B partitions(Property<Integer> partitions) {
                this.partitions = partitions;
                return this.self();
            }

            @Generated
            public B rows(Property<Integer> rows) {
                this.rows$value = rows;
                this.rows$set = true;
                return this.self();
            }

            @Generated
            public B separator(Property<String> separator) {
                this.separator$value = separator;
                this.separator$set = true;
                return this.self();
            }

            @Generated
            public B regexPattern(Property<String> regexPattern) {
                this.regexPattern = regexPattern;
                return this.self();
            }

            @Generated
            protected abstract B self();

            @Generated
            public abstract C build();

            @Generated
            public String toString() {
                return "ForEachItem.Batch.BatchBuilder(bytes=" + String.valueOf(this.bytes) + ", partitions=" + String.valueOf(this.partitions) + ", rows$value=" + String.valueOf(this.rows$value) + ", separator$value=" + String.valueOf(this.separator$value) + ", regexPattern=" + String.valueOf(this.regexPattern) + ")";
            }
        }

        @Generated
        private static final class BatchBuilderImpl
        extends BatchBuilder<Batch, BatchBuilderImpl> {
            @Generated
            private BatchBuilderImpl() {
            }

            @Override
            @Generated
            protected BatchBuilderImpl self() {
                return this;
            }

            @Override
            @Generated
            public Batch build() {
                return new Batch(this);
            }
        }
    }

    @Plugin(internal=true)
    public static class ForEachItemExecutable
    extends Task
    implements ExecutableTask<Output> {
        static final String SUFFIX = "_items";
        private Map<String, Object> inputs;
        private Boolean inheritLabels;
        private List<Label> labels;
        private Boolean wait;
        private Boolean transmitFailed;
        private Property<ZonedDateTime> scheduleOn;
        private ExecutableTask.SubflowId subflowId;
        private ExecutableTask.RestartBehavior restartBehavior;

        private ForEachItemExecutable(String parentId, Map<String, Object> inputs, Boolean inheritLabels, List<Label> labels, Boolean wait, Boolean transmitFailed, Property<ZonedDateTime> scheduleOn, ExecutableTask.SubflowId subflowId, ExecutableTask.RestartBehavior restartBehavior) {
            this.inputs = inputs;
            this.inheritLabels = inheritLabels;
            this.labels = labels;
            this.wait = wait;
            this.transmitFailed = transmitFailed;
            this.scheduleOn = scheduleOn;
            this.subflowId = subflowId;
            this.restartBehavior = restartBehavior;
            this.id = parentId + SUFFIX;
            this.type = ForEachItemExecutable.class.getName();
        }

        @Override
        public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext, FlowMetaStoreInterface flowExecutorInterface, Flow currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
            List<SubflowExecution<?>> list;
            block8: {
                String taskId = this.id.substring(0, this.id.lastIndexOf(95)) + "_split";
                Map<String, Object> taskOutput = ForEachItem.extractOutput(runContext, taskId);
                URI splitsURI = URI.create((String)taskOutput.get("splits"));
                InputStream is = runContext.storage().getFile(splitsURI);
                try {
                    String fileContent = new String(is.readAllBytes());
                    List<URI> splits = fileContent.lines().map(line -> URI.create(line)).toList();
                    AtomicInteger currentIteration = new AtomicInteger(0);
                    list = splits.stream().map(Rethrow.throwFunction(split -> {
                        int iteration = currentIteration.getAndIncrement();
                        Map<String, Object> itemsVariable = Map.of("taskrun", Map.of("items", split, "iteration", iteration));
                        HashMap<String, Object> inputs = new HashMap<String, Object>();
                        if (this.inputs != null) {
                            inputs.putAll(runContext.render(this.inputs, itemsVariable));
                        }
                        Output outputs = Output.builder().numberOfBatches(splits.size()).uri(URI.create(runContext.getStorageOutputPrefix().toString() + "/" + iteration + "/outputs.ion")).build();
                        return ExecutableUtils.subflowExecution(runContext, flowExecutorInterface, currentExecution, currentFlow, this, currentTaskRun.withOutputs(Variables.inMemory(outputs.toMap())).withIteration(iteration), inputs, this.labels, this.inheritLabels, this.scheduleOn);
                    })).filter(Optional::isPresent).map(Optional::get).toList();
                    if (is == null) break block8;
                }
                catch (Throwable throwable) {
                    try {
                        if (is != null) {
                            try {
                                is.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (IOException e) {
                        throw new InternalException(e);
                    }
                }
                is.close();
            }
            return list;
        }

        @Override
        public Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext, TaskRun taskRun, FlowInterface flow, Execution execution) {
            if (taskRun.getState().isTerminated() && flow.getOutputs() != null && this.waitForExecution()) {
                Output.OutputBuilder builder = Output.builder().iterations((Map)taskRun.getOutputs().get("iterations")).numberOfBatches((Integer)taskRun.getOutputs().get("numberOfBatches"));
                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();){
                    FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext));
                    URI uri = runContext.storage().putFile((InputStream)new ByteArrayInputStream(bos.toByteArray()), URI.create((String)taskRun.getOutputs().get("uri")));
                    builder.uri(uri);
                }
                catch (Exception e) {
                    runContext.logger().warn("Failed to extract outputs with the error: '{}'", (Object)e.getLocalizedMessage(), (Object)e);
                    State.Type state = State.Type.fail(this);
                    taskRun = taskRun.withState(state).withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build())).withOutputs(Variables.inMemory(builder.build().toMap()));
                    return Optional.of(SubflowExecutionResult.builder().executionId(execution.getId()).state(State.Type.FAILED).parentTaskRun(taskRun).build());
                }
                taskRun = taskRun.withOutputs(Variables.inMemory(builder.build().toMap()));
            }
            return Optional.of(ExecutableUtils.subflowExecutionResult(taskRun, execution));
        }

        @Override
        public boolean waitForExecution() {
            return this.wait;
        }

        @Override
        public ExecutableTask.SubflowId subflowId() {
            return this.subflowId;
        }

        @Generated
        public Map<String, Object> getInputs() {
            return this.inputs;
        }

        @Generated
        public Boolean getInheritLabels() {
            return this.inheritLabels;
        }

        @Generated
        public List<Label> getLabels() {
            return this.labels;
        }

        @Generated
        public Boolean getWait() {
            return this.wait;
        }

        @Generated
        public Boolean getTransmitFailed() {
            return this.transmitFailed;
        }

        @Generated
        public Property<ZonedDateTime> getScheduleOn() {
            return this.scheduleOn;
        }

        @Generated
        public ExecutableTask.SubflowId getSubflowId() {
            return this.subflowId;
        }

        @Override
        @Generated
        public ExecutableTask.RestartBehavior getRestartBehavior() {
            return this.restartBehavior;
        }

        @Generated
        public ForEachItemExecutable() {
        }
    }

    @Plugin(internal=true)
    public static class ForEachItemMergeOutputs
    extends Task
    implements RunnableTask<Output> {
        static final String SUFFIX = "_merge";

        private ForEachItemMergeOutputs(String parentId) {
            this.id = parentId + SUFFIX;
            this.type = ForEachItemMergeOutputs.class.getName();
        }

        @Override
        public Output run(RunContext runContext) throws Exception {
            List<FileAttributes> list;
            String taskId = this.id.substring(0, this.id.lastIndexOf(95)) + "_items";
            Map<String, Object> taskOutput = ForEachItem.extractOutput(runContext, taskId);
            if (taskOutput == null) {
                return null;
            }
            String subflowOutputsBase = (String)taskOutput.get("subflowOutputsBaseUri");
            URI subflowOutputsBaseUri = URI.create("kestra://" + subflowOutputsBase + "/");
            StorageInterface storage = (StorageInterface)((DefaultRunContext)runContext).getApplicationContext().getBean(StorageInterface.class);
            if (storage.exists(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri) && !(list = storage.list(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri)).isEmpty()) {
                List<InputStream> streams = list.stream().map(Rethrow.throwFunction(attr -> {
                    URI file = subflowOutputsBaseUri.resolve(attr.getFileName() + "/outputs.ion");
                    return runContext.storage().getFile(file);
                })).toList();
                try (SequenceInputStream is = new SequenceInputStream(Collections.enumeration(streams));){
                    URI uri = runContext.storage().putFile((InputStream)is, "outputs.ion");
                    Output output = Output.builder().subflowOutputs(uri).build();
                    return output;
                }
            }
            return null;
        }

        @Generated
        public ForEachItemMergeOutputs() {
        }

        public static class Output
        implements io.kestra.core.models.tasks.Output {
            private URI subflowOutputs;

            @ConstructorProperties(value={"subflowOutputs"})
            @Generated
            Output(URI subflowOutputs) {
                this.subflowOutputs = subflowOutputs;
            }

            @Generated
            public static OutputBuilder builder() {
                return new OutputBuilder();
            }

            @Generated
            public URI getSubflowOutputs() {
                return this.subflowOutputs;
            }

            @Generated
            public static class OutputBuilder {
                @Generated
                private URI subflowOutputs;

                @Generated
                OutputBuilder() {
                }

                @Generated
                public OutputBuilder subflowOutputs(URI subflowOutputs) {
                    this.subflowOutputs = subflowOutputs;
                    return this;
                }

                @Generated
                public Output build() {
                    return new Output(this.subflowOutputs);
                }

                @Generated
                public String toString() {
                    return "ForEachItem.ForEachItemMergeOutputs.Output.OutputBuilder(subflowOutputs=" + String.valueOf(this.subflowOutputs) + ")";
                }
            }
        }
    }

    @Generated
    public static abstract class ForEachItemBuilder<C extends ForEachItem, B extends ForEachItemBuilder<C, B>>
    extends Task.TaskBuilder<C, B> {
        @Generated
        private String items;
        @Generated
        private boolean batch$set;
        @Generated
        private Batch batch$value;
        @Generated
        private String namespace;
        @Generated
        private String flowId;
        @Generated
        private Integer revision;
        @Generated
        private Map<String, Object> inputs;
        @Generated
        private List<@NoSystemLabelValidation Label> labels;
        @Generated
        private boolean wait$set;
        @Generated
        private Boolean wait$value;
        @Generated
        private boolean transmitFailed$set;
        @Generated
        private Boolean transmitFailed$value;
        @Generated
        private boolean inheritLabels$set;
        @Generated
        private Boolean inheritLabels$value;
        @Generated
        private Property<ZonedDateTime> scheduleDate;
        @Generated
        private List<Task> errors;
        @Generated
        private List<Task> _finally;
        @Generated
        private boolean restartBehavior$set;
        @Generated
        private ExecutableTask.RestartBehavior restartBehavior$value;

        @Override
        @Generated
        protected B $fillValuesFrom(C instance) {
            super.$fillValuesFrom(instance);
            ForEachItemBuilder.$fillValuesFromInstanceIntoBuilder(instance, this);
            return (B)this.self();
        }

        @Generated
        private static void $fillValuesFromInstanceIntoBuilder(ForEachItem instance, ForEachItemBuilder<?, ?> b) {
            b.items(instance.items);
            b.batch(instance.batch);
            b.namespace(instance.namespace);
            b.flowId(instance.flowId);
            b.revision(instance.revision);
            b.inputs(instance.inputs);
            b.labels(instance.labels);
            b.wait(instance.wait);
            b.transmitFailed(instance.transmitFailed);
            b.inheritLabels(instance.inheritLabels);
            b.scheduleDate(instance.scheduleDate);
            b.errors(instance.errors);
            b._finally(instance._finally);
            b.restartBehavior(instance.restartBehavior);
        }

        @Generated
        public B items(String items) {
            this.items = items;
            return (B)this.self();
        }

        @Generated
        public B batch(Batch batch) {
            this.batch$value = batch;
            this.batch$set = true;
            return (B)this.self();
        }

        @Generated
        public B namespace(String namespace) {
            this.namespace = namespace;
            return (B)this.self();
        }

        @Generated
        public B flowId(String flowId) {
            this.flowId = flowId;
            return (B)this.self();
        }

        @Generated
        public B revision(Integer revision) {
            this.revision = revision;
            return (B)this.self();
        }

        @Generated
        public B inputs(Map<String, Object> inputs) {
            this.inputs = inputs;
            return (B)this.self();
        }

        @JsonDeserialize(using=ListOrMapOfLabelDeserializer.class)
        @Generated
        public B labels(List<@NoSystemLabelValidation Label> labels) {
            this.labels = labels;
            return (B)this.self();
        }

        @Generated
        public B wait(Boolean wait) {
            this.wait$value = wait;
            this.wait$set = true;
            return (B)this.self();
        }

        @Generated
        public B transmitFailed(Boolean transmitFailed) {
            this.transmitFailed$value = transmitFailed;
            this.transmitFailed$set = true;
            return (B)this.self();
        }

        @Generated
        public B inheritLabels(Boolean inheritLabels) {
            this.inheritLabels$value = inheritLabels;
            this.inheritLabels$set = true;
            return (B)this.self();
        }

        @Generated
        public B scheduleDate(Property<ZonedDateTime> scheduleDate) {
            this.scheduleDate = scheduleDate;
            return (B)this.self();
        }

        @Generated
        public B errors(List<Task> errors) {
            this.errors = errors;
            return (B)this.self();
        }

        @JsonProperty(value="finally")
        @Generated
        public B _finally(List<Task> _finally) {
            this._finally = _finally;
            return (B)this.self();
        }

        @Generated
        public B restartBehavior(ExecutableTask.RestartBehavior restartBehavior) {
            this.restartBehavior$value = restartBehavior;
            this.restartBehavior$set = true;
            return (B)this.self();
        }

        @Override
        @Generated
        protected abstract B self();

        @Override
        @Generated
        public abstract C build();

        @Override
        @Generated
        public String toString() {
            return "ForEachItem.ForEachItemBuilder(super=" + super.toString() + ", items=" + this.items + ", batch$value=" + String.valueOf(this.batch$value) + ", namespace=" + this.namespace + ", flowId=" + this.flowId + ", revision=" + this.revision + ", inputs=" + String.valueOf(this.inputs) + ", labels=" + String.valueOf(this.labels) + ", wait$value=" + this.wait$value + ", transmitFailed$value=" + this.transmitFailed$value + ", inheritLabels$value=" + this.inheritLabels$value + ", scheduleDate=" + String.valueOf(this.scheduleDate) + ", errors=" + String.valueOf(this.errors) + ", _finally=" + String.valueOf(this._finally) + ", restartBehavior$value=" + String.valueOf((Object)this.restartBehavior$value) + ")";
        }
    }

    @Generated
    private static final class ForEachItemBuilderImpl
    extends ForEachItemBuilder<ForEachItem, ForEachItemBuilderImpl> {
        @Generated
        private ForEachItemBuilderImpl() {
        }

        @Override
        @Generated
        protected ForEachItemBuilderImpl self() {
            return this;
        }

        @Override
        @Generated
        public ForEachItem build() {
            return new ForEachItem(this);
        }
    }

    public static class Output
    implements io.kestra.core.models.tasks.Output {
        @Schema(title="The counter of iterations for each subflow execution state", description="This output will be updated in real-time based on the state of subflow executions.\n It will contain one counter by subflow execution state.")
        private final Map<State.Type, Integer> iterations;
        @Schema(title="The number of batches")
        private final Integer numberOfBatches;
        @Schema(title="The URI of the file gathering outputs from each subflow execution")
        private final URI uri;

        @ConstructorProperties(value={"iterations", "numberOfBatches", "uri"})
        @Generated
        Output(Map<State.Type, Integer> iterations, Integer numberOfBatches, URI uri) {
            this.iterations = iterations;
            this.numberOfBatches = numberOfBatches;
            this.uri = uri;
        }

        @Generated
        public static OutputBuilder builder() {
            return new OutputBuilder();
        }

        @Generated
        public Map<State.Type, Integer> getIterations() {
            return this.iterations;
        }

        @Generated
        public Integer getNumberOfBatches() {
            return this.numberOfBatches;
        }

        @Generated
        public URI getUri() {
            return this.uri;
        }

        @Generated
        public static class OutputBuilder {
            @Generated
            private Map<State.Type, Integer> iterations;
            @Generated
            private Integer numberOfBatches;
            @Generated
            private URI uri;

            @Generated
            OutputBuilder() {
            }

            @Generated
            public OutputBuilder iterations(Map<State.Type, Integer> iterations) {
                this.iterations = iterations;
                return this;
            }

            @Generated
            public OutputBuilder numberOfBatches(Integer numberOfBatches) {
                this.numberOfBatches = numberOfBatches;
                return this;
            }

            @Generated
            public OutputBuilder uri(URI uri) {
                this.uri = uri;
                return this;
            }

            @Generated
            public Output build() {
                return new Output(this.iterations, this.numberOfBatches, this.uri);
            }

            @Generated
            public String toString() {
                return "ForEachItem.Output.OutputBuilder(iterations=" + String.valueOf(this.iterations) + ", numberOfBatches=" + this.numberOfBatches + ", uri=" + String.valueOf(this.uri) + ")";
            }
        }
    }
}

