/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.tasks.flows;

import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.services.StorageService;
import io.kestra.core.storages.StorageSplitInterface;
import io.kestra.core.utils.Rethrow;
import io.swagger.v3.oas.annotations.media.Schema;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import lombok.Generated;

@Schema(title="Execute a subflow for each batch of items", description="Execute a subflow for each batch of items. The `items` value must be internal storage URI e.g. an output file from a previous task, or a file from inputs of FILE type.")
@Plugin(examples={@Example(title="Execute a subflow for each batch of items. The subflow `orders` is called from the parent flow `orders_parallel` using the `ForEachItem` task in order to start one subflow execution for each batch of items.\n```yaml\nid: orders\nnamespace: prod\n\ninputs:\n  - name: order\n    type: STRING\n\ntasks:\n  - id: read_file\n    type: io.kestra.plugin.scripts.shell.Commands\n    runner: PROCESS\n    commands:\n      - cat \"{{ inputs.order }}\"\n\n  - id: read_file_content\n    type: io.kestra.core.tasks.log.Log\n    message: \"{{ read(inputs.order) }}\"\n```\n", full=true, code={"id: orders_parallel\nnamespace: prod\n\ntasks:\n  - id: extract\n    type: io.kestra.plugin.jdbc.duckdb.Query\n    sql: |\n      INSTALL httpfs;\n      LOAD httpfs;\n      SELECT *\n      FROM read_csv_auto('https://raw.githubusercontent.com/kestra-io/datasets/main/csv/orders.csv', header=True);\n    store: true\n\n  - id: each\n    type: io.kestra.core.tasks.flows.ForEachItem\n    items: \"{{ outputs.extract.uri }}\"\n    batch:\n      rows: 1\n    namespace: prod\n    flowId: orders\n    wait: true # wait for the subflow execution\n    transmitFailed: true # fail the task run if the subflow execution fails\n    inputs:\n      order: \"{{ taskrun.items }}\" # special variable that contains the items of the batch"})})
public class ForEachItem
extends Task
implements ExecutableTask<Output> {
    @NotEmpty
    @PluginProperty(dynamic=true)
    @Schema(title="The items to be split into batches and processed. Make sure to set it to Kestra's internal storage URI. This can be either the output from a previous task, formatted as `{{ outputs.task_id.uri }}`, or a FILE type input parameter, like `{{ inputs.myfile }}`. This task is optimized for files where each line represents a single item. Suitable file types include Amazon ION-type files (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For files in other formats such as Excel, CSV, Avro, Parquet, XML, or JSON, it's recommended to first convert them to the ION format. This can be done using the conversion tasks available in the `io.kestra.plugin.serdes` module, which will transform files from their original format to ION.")
    private String items;
    @NotNull
    @PluginProperty
    @Schema(title="How to split the items into batches.")
    private Batch batch;
    @NotEmpty
    @Schema(title="The namespace of the subflow to be executed")
    @PluginProperty(dynamic=true)
    private String namespace;
    @NotEmpty
    @Schema(title="The identifier of the subflow to be executed")
    @PluginProperty(dynamic=true)
    private String flowId;
    @Schema(title="The revision of the subflow to be executed", description="By default, the last, i.e. the most recent, revision of the subflow is executed.")
    @PluginProperty
    private Integer revision;
    @Schema(title="The inputs to pass to the subflow to be executed")
    @PluginProperty(dynamic=true)
    private Map<String, Object> inputs;
    @Schema(title="The labels to pass to the subflow to be executed")
    @PluginProperty(dynamic=true)
    private Map<String, String> labels;
    @Schema(title="Whether to wait for the subflows execution to finish before continuing the current execution.")
    @PluginProperty
    private final Boolean wait;
    @Schema(title="Whether to fail the current execution if the subflow execution fails or is killed.", description="Note that this option works only if `wait` is set to `true`.")
    @PluginProperty
    private final Boolean transmitFailed;
    @Schema(title="Whether the subflow should inherit labels from this execution that triggered it.", description="By default, labels are not passed to the subflow execution. If you set this option to `true`, the child flow execution will inherit all labels from the parent execution.")
    @PluginProperty
    private final Boolean inheritLabels;

    @Override
    public List<WorkerTaskExecution<?>> createWorkerTaskExecutions(RunContext runContext, FlowExecutorInterface flowExecutorInterface, Flow currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
        String renderedUri = runContext.render(this.items);
        if (!renderedUri.startsWith("kestra://")) {
            String errorMessage = "Unable to split the items from " + renderedUri + ", this is not an internal storage URI!";
            runContext.logger().error(errorMessage);
            throw new IllegalArgumentException(errorMessage);
        }
        try {
            List<URI> splits = StorageService.split(runContext, this.batch, URI.create(renderedUri));
            AtomicInteger currentIteration = new AtomicInteger(1);
            return splits.stream().map(Rethrow.throwFunction(split -> {
                Map<String, Object> itemsVariable = Map.of("taskrun", Map.of("items", split.toString()));
                HashMap<String, Object> inputs = new HashMap<String, Object>();
                if (this.inputs != null) {
                    inputs.putAll(runContext.render(this.inputs, itemsVariable));
                }
                ArrayList<Label> labels = new ArrayList<Label>();
                if (this.inheritLabels.booleanValue()) {
                    labels.addAll(currentExecution.getLabels());
                }
                if (this.labels != null) {
                    for (Map.Entry<String, String> entry : this.labels.entrySet()) {
                        labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
                    }
                }
                int iteration = currentIteration.getAndIncrement();
                Output outputs = Output.builder().iterations(Map.of("max", splits.size())).build();
                return ExecutableUtils.workerTaskExecution(runContext, flowExecutorInterface, currentExecution, currentFlow, this, currentTaskRun.withOutputs(outputs.toMap()).withItems(split.toString()), inputs, labels, iteration);
            })).toList();
        }
        catch (IOException e) {
            runContext.logger().error(e.getMessage(), (Throwable)e);
            throw new InternalException(e);
        }
    }

    @Override
    public Optional<WorkerTaskResult> createWorkerTaskResult(RunContext runContext, TaskRun taskRun, Flow flow, Execution execution) {
        return Optional.of(ExecutableUtils.workerTaskResult(taskRun));
    }

    @Override
    public boolean waitForExecution() {
        return this.wait;
    }

    @Override
    public ExecutableTask.SubflowId subflowId() {
        return new ExecutableTask.SubflowId(this.namespace, this.flowId, Optional.ofNullable(this.revision));
    }

    @Generated
    private static Batch $default$batch() {
        return Batch.builder().build();
    }

    @Generated
    private static Boolean $default$wait() {
        return true;
    }

    @Generated
    private static Boolean $default$transmitFailed() {
        return true;
    }

    @Generated
    private static Boolean $default$inheritLabels() {
        return false;
    }

    @Generated
    protected ForEachItem(ForEachItemBuilder<?, ?> b) {
        super((Task.TaskBuilder)b);
        this.items = b.items;
        this.batch = b.batch$set ? b.batch$value : ForEachItem.$default$batch();
        this.namespace = b.namespace;
        this.flowId = b.flowId;
        this.revision = b.revision;
        this.inputs = b.inputs;
        this.labels = b.labels;
        this.wait = b.wait$set ? b.wait$value : ForEachItem.$default$wait();
        this.transmitFailed = b.transmitFailed$set ? b.transmitFailed$value : ForEachItem.$default$transmitFailed();
        this.inheritLabels = b.inheritLabels$set ? b.inheritLabels$value : ForEachItem.$default$inheritLabels();
    }

    @Generated
    public static ForEachItemBuilder<?, ?> builder() {
        return new ForEachItemBuilderImpl();
    }

    @Generated
    public ForEachItemBuilder<?, ?> toBuilder() {
        return new ForEachItemBuilderImpl().$fillValuesFrom(this);
    }

    @Generated
    public String toString() {
        return "ForEachItem(super=" + super.toString() + ", items=" + this.getItems() + ", batch=" + this.getBatch() + ", namespace=" + this.getNamespace() + ", flowId=" + this.getFlowId() + ", revision=" + this.getRevision() + ", inputs=" + this.getInputs() + ", labels=" + this.getLabels() + ", wait=" + this.getWait() + ", transmitFailed=" + this.getTransmitFailed() + ", inheritLabels=" + this.getInheritLabels() + ")";
    }

    @Generated
    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof ForEachItem)) {
            return false;
        }
        ForEachItem other = (ForEachItem)o;
        if (!other.canEqual(this)) {
            return false;
        }
        if (!super.equals(o)) {
            return false;
        }
        Integer this$revision = this.getRevision();
        Integer other$revision = other.getRevision();
        if (this$revision == null ? other$revision != null : !((Object)this$revision).equals(other$revision)) {
            return false;
        }
        Boolean this$wait = this.getWait();
        Boolean other$wait = other.getWait();
        if (this$wait == null ? other$wait != null : !((Object)this$wait).equals(other$wait)) {
            return false;
        }
        Boolean this$transmitFailed = this.getTransmitFailed();
        Boolean other$transmitFailed = other.getTransmitFailed();
        if (this$transmitFailed == null ? other$transmitFailed != null : !((Object)this$transmitFailed).equals(other$transmitFailed)) {
            return false;
        }
        Boolean this$inheritLabels = this.getInheritLabels();
        Boolean other$inheritLabels = other.getInheritLabels();
        if (this$inheritLabels == null ? other$inheritLabels != null : !((Object)this$inheritLabels).equals(other$inheritLabels)) {
            return false;
        }
        String this$items = this.getItems();
        String other$items = other.getItems();
        if (this$items == null ? other$items != null : !this$items.equals(other$items)) {
            return false;
        }
        Batch this$batch = this.getBatch();
        Batch other$batch = other.getBatch();
        if (this$batch == null ? other$batch != null : !((Object)this$batch).equals(other$batch)) {
            return false;
        }
        String this$namespace = this.getNamespace();
        String other$namespace = other.getNamespace();
        if (this$namespace == null ? other$namespace != null : !this$namespace.equals(other$namespace)) {
            return false;
        }
        String this$flowId = this.getFlowId();
        String other$flowId = other.getFlowId();
        if (this$flowId == null ? other$flowId != null : !this$flowId.equals(other$flowId)) {
            return false;
        }
        Map<String, Object> this$inputs = this.getInputs();
        Map<String, Object> other$inputs = other.getInputs();
        if (this$inputs == null ? other$inputs != null : !((Object)this$inputs).equals(other$inputs)) {
            return false;
        }
        Map<String, String> this$labels = this.getLabels();
        Map<String, String> other$labels = other.getLabels();
        return !(this$labels == null ? other$labels != null : !((Object)this$labels).equals(other$labels));
    }

    @Generated
    protected boolean canEqual(Object other) {
        return other instanceof ForEachItem;
    }

    @Generated
    public int hashCode() {
        int PRIME = 59;
        int result = super.hashCode();
        Integer $revision = this.getRevision();
        result = result * 59 + ($revision == null ? 43 : ((Object)$revision).hashCode());
        Boolean $wait = this.getWait();
        result = result * 59 + ($wait == null ? 43 : ((Object)$wait).hashCode());
        Boolean $transmitFailed = this.getTransmitFailed();
        result = result * 59 + ($transmitFailed == null ? 43 : ((Object)$transmitFailed).hashCode());
        Boolean $inheritLabels = this.getInheritLabels();
        result = result * 59 + ($inheritLabels == null ? 43 : ((Object)$inheritLabels).hashCode());
        String $items = this.getItems();
        result = result * 59 + ($items == null ? 43 : $items.hashCode());
        Batch $batch = this.getBatch();
        result = result * 59 + ($batch == null ? 43 : ((Object)$batch).hashCode());
        String $namespace = this.getNamespace();
        result = result * 59 + ($namespace == null ? 43 : $namespace.hashCode());
        String $flowId = this.getFlowId();
        result = result * 59 + ($flowId == null ? 43 : $flowId.hashCode());
        Map<String, Object> $inputs = this.getInputs();
        result = result * 59 + ($inputs == null ? 43 : ((Object)$inputs).hashCode());
        Map<String, String> $labels = this.getLabels();
        result = result * 59 + ($labels == null ? 43 : ((Object)$labels).hashCode());
        return result;
    }

    @Generated
    public String getItems() {
        return this.items;
    }

    @Generated
    public Batch getBatch() {
        return this.batch;
    }

    @Generated
    public String getNamespace() {
        return this.namespace;
    }

    @Generated
    public String getFlowId() {
        return this.flowId;
    }

    @Generated
    public Integer getRevision() {
        return this.revision;
    }

    @Generated
    public Map<String, Object> getInputs() {
        return this.inputs;
    }

    @Generated
    public Map<String, String> getLabels() {
        return this.labels;
    }

    @Generated
    public Boolean getWait() {
        return this.wait;
    }

    @Generated
    public Boolean getTransmitFailed() {
        return this.transmitFailed;
    }

    @Generated
    public Boolean getInheritLabels() {
        return this.inheritLabels;
    }

    @Generated
    public ForEachItem() {
        this.batch = ForEachItem.$default$batch();
        this.wait = ForEachItem.$default$wait();
        this.transmitFailed = ForEachItem.$default$transmitFailed();
        this.inheritLabels = ForEachItem.$default$inheritLabels();
    }

    public static class Batch
    implements StorageSplitInterface {
        private String bytes;
        private Integer partitions;
        private Integer rows;
        private String separator;

        @Generated
        private static Integer $default$rows() {
            return 1;
        }

        @Generated
        private static String $default$separator() {
            return "\n";
        }

        @Generated
        protected Batch(BatchBuilder<?, ?> b) {
            this.bytes = b.bytes;
            this.partitions = b.partitions;
            this.rows = b.rows$set ? b.rows$value : Batch.$default$rows();
            this.separator = b.separator$set ? b.separator$value : Batch.$default$separator();
        }

        @Generated
        public static BatchBuilder<?, ?> builder() {
            return new BatchBuilderImpl();
        }

        @Generated
        public String toString() {
            return "ForEachItem.Batch(bytes=" + this.getBytes() + ", partitions=" + this.getPartitions() + ", rows=" + this.getRows() + ", separator=" + this.getSeparator() + ")";
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Batch)) {
                return false;
            }
            Batch other = (Batch)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Integer this$partitions = this.getPartitions();
            Integer other$partitions = other.getPartitions();
            if (this$partitions == null ? other$partitions != null : !((Object)this$partitions).equals(other$partitions)) {
                return false;
            }
            Integer this$rows = this.getRows();
            Integer other$rows = other.getRows();
            if (this$rows == null ? other$rows != null : !((Object)this$rows).equals(other$rows)) {
                return false;
            }
            String this$bytes = this.getBytes();
            String other$bytes = other.getBytes();
            if (this$bytes == null ? other$bytes != null : !this$bytes.equals(other$bytes)) {
                return false;
            }
            String this$separator = this.getSeparator();
            String other$separator = other.getSeparator();
            return !(this$separator == null ? other$separator != null : !this$separator.equals(other$separator));
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof Batch;
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Integer $partitions = this.getPartitions();
            result = result * 59 + ($partitions == null ? 43 : ((Object)$partitions).hashCode());
            Integer $rows = this.getRows();
            result = result * 59 + ($rows == null ? 43 : ((Object)$rows).hashCode());
            String $bytes = this.getBytes();
            result = result * 59 + ($bytes == null ? 43 : $bytes.hashCode());
            String $separator = this.getSeparator();
            result = result * 59 + ($separator == null ? 43 : $separator.hashCode());
            return result;
        }

        @Override
        @Generated
        public String getBytes() {
            return this.bytes;
        }

        @Override
        @Generated
        public Integer getPartitions() {
            return this.partitions;
        }

        @Override
        @Generated
        public Integer getRows() {
            return this.rows;
        }

        @Override
        @Generated
        public String getSeparator() {
            return this.separator;
        }

        @Generated
        public Batch() {
            this.rows = Batch.$default$rows();
            this.separator = Batch.$default$separator();
        }

        @Generated
        public static abstract class BatchBuilder<C extends Batch, B extends BatchBuilder<C, B>> {
            @Generated
            private String bytes;
            @Generated
            private Integer partitions;
            @Generated
            private boolean rows$set;
            @Generated
            private Integer rows$value;
            @Generated
            private boolean separator$set;
            @Generated
            private String separator$value;

            @Generated
            public B bytes(String bytes) {
                this.bytes = bytes;
                return this.self();
            }

            @Generated
            public B partitions(Integer partitions) {
                this.partitions = partitions;
                return this.self();
            }

            @Generated
            public B rows(Integer rows) {
                this.rows$value = rows;
                this.rows$set = true;
                return this.self();
            }

            @Generated
            public B separator(String separator) {
                this.separator$value = separator;
                this.separator$set = true;
                return this.self();
            }

            @Generated
            protected abstract B self();

            @Generated
            public abstract C build();

            @Generated
            public String toString() {
                return "ForEachItem.Batch.BatchBuilder(bytes=" + this.bytes + ", partitions=" + this.partitions + ", rows$value=" + this.rows$value + ", separator$value=" + this.separator$value + ")";
            }
        }

        @Generated
        private static final class BatchBuilderImpl
        extends BatchBuilder<Batch, BatchBuilderImpl> {
            @Generated
            private BatchBuilderImpl() {
            }

            @Override
            @Generated
            protected BatchBuilderImpl self() {
                return this;
            }

            @Override
            @Generated
            public Batch build() {
                return new Batch(this);
            }
        }
    }

    @Generated
    public static abstract class ForEachItemBuilder<C extends ForEachItem, B extends ForEachItemBuilder<C, B>>
    extends Task.TaskBuilder<C, B> {
        @Generated
        private String items;
        @Generated
        private boolean batch$set;
        @Generated
        private Batch batch$value;
        @Generated
        private String namespace;
        @Generated
        private String flowId;
        @Generated
        private Integer revision;
        @Generated
        private Map<String, Object> inputs;
        @Generated
        private Map<String, String> labels;
        @Generated
        private boolean wait$set;
        @Generated
        private Boolean wait$value;
        @Generated
        private boolean transmitFailed$set;
        @Generated
        private Boolean transmitFailed$value;
        @Generated
        private boolean inheritLabels$set;
        @Generated
        private Boolean inheritLabels$value;

        @Override
        @Generated
        protected B $fillValuesFrom(C instance) {
            super.$fillValuesFrom(instance);
            ForEachItemBuilder.$fillValuesFromInstanceIntoBuilder(instance, this);
            return (B)this.self();
        }

        @Generated
        private static void $fillValuesFromInstanceIntoBuilder(ForEachItem instance, ForEachItemBuilder<?, ?> b) {
            b.items(instance.items);
            b.batch(instance.batch);
            b.namespace(instance.namespace);
            b.flowId(instance.flowId);
            b.revision(instance.revision);
            b.inputs(instance.inputs);
            b.labels(instance.labels);
            b.wait(instance.wait);
            b.transmitFailed(instance.transmitFailed);
            b.inheritLabels(instance.inheritLabels);
        }

        @Generated
        public B items(String items) {
            this.items = items;
            return (B)this.self();
        }

        @Generated
        public B batch(Batch batch) {
            this.batch$value = batch;
            this.batch$set = true;
            return (B)this.self();
        }

        @Generated
        public B namespace(String namespace) {
            this.namespace = namespace;
            return (B)this.self();
        }

        @Generated
        public B flowId(String flowId) {
            this.flowId = flowId;
            return (B)this.self();
        }

        @Generated
        public B revision(Integer revision) {
            this.revision = revision;
            return (B)this.self();
        }

        @Generated
        public B inputs(Map<String, Object> inputs) {
            this.inputs = inputs;
            return (B)this.self();
        }

        @Generated
        public B labels(Map<String, String> labels) {
            this.labels = labels;
            return (B)this.self();
        }

        @Generated
        public B wait(Boolean wait) {
            this.wait$value = wait;
            this.wait$set = true;
            return (B)this.self();
        }

        @Generated
        public B transmitFailed(Boolean transmitFailed) {
            this.transmitFailed$value = transmitFailed;
            this.transmitFailed$set = true;
            return (B)this.self();
        }

        @Generated
        public B inheritLabels(Boolean inheritLabels) {
            this.inheritLabels$value = inheritLabels;
            this.inheritLabels$set = true;
            return (B)this.self();
        }

        @Override
        @Generated
        protected abstract B self();

        @Override
        @Generated
        public abstract C build();

        @Override
        @Generated
        public String toString() {
            return "ForEachItem.ForEachItemBuilder(super=" + super.toString() + ", items=" + this.items + ", batch$value=" + this.batch$value + ", namespace=" + this.namespace + ", flowId=" + this.flowId + ", revision=" + this.revision + ", inputs=" + this.inputs + ", labels=" + this.labels + ", wait$value=" + this.wait$value + ", transmitFailed$value=" + this.transmitFailed$value + ", inheritLabels$value=" + this.inheritLabels$value + ")";
        }
    }

    @Generated
    private static final class ForEachItemBuilderImpl
    extends ForEachItemBuilder<ForEachItem, ForEachItemBuilderImpl> {
        @Generated
        private ForEachItemBuilderImpl() {
        }

        @Override
        @Generated
        protected ForEachItemBuilderImpl self() {
            return this;
        }

        @Override
        @Generated
        public ForEachItem build() {
            return new ForEachItem(this);
        }
    }

    public static class Output
    implements io.kestra.core.models.tasks.Output {
        @Schema(title="The counter of iterations for each subflow execution state.", description="This output will be updated in real-time based on the state of subflow executions.\n It will contain one counter per subflow execution state, as well as a `max` counter that represents the maximum number of iterations (i.e. the total number of batches).")
        private final Map<String, Integer> iterations;

        @ConstructorProperties(value={"iterations"})
        @Generated
        Output(Map<String, Integer> iterations) {
            this.iterations = iterations;
        }

        @Generated
        public static OutputBuilder builder() {
            return new OutputBuilder();
        }

        @Generated
        public Map<String, Integer> getIterations() {
            return this.iterations;
        }

        @Generated
        public static class OutputBuilder {
            @Generated
            private Map<String, Integer> iterations;

            @Generated
            OutputBuilder() {
            }

            @Generated
            public OutputBuilder iterations(Map<String, Integer> iterations) {
                this.iterations = iterations;
                return this;
            }

            @Generated
            public Output build() {
                return new Output(this.iterations);
            }

            @Generated
            public String toString() {
                return "ForEachItem.Output.OutputBuilder(iterations=" + this.iterations + ")";
            }
        }
    }
}

