/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.runners;

import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import reactor.core.publisher.Flux;

@Singleton
public class DeserializationIssuesCaseTest {
    private static final String INVALID_WORKER_TASK_KEY = "5PGRX6ve2cztrRSIbfGphO";
    private static final String INVALID_WORKER_TASK_VALUE = "{\n  \"task\": {\n    \"id\": \"invalid\",\n    \"type\": \"io.kestra.notfound.Invalid\"\n  },\n  \"type\": \"task\",\n  \"taskRun\": {\n    \"id\": \"5PGRX6ve2cztrRSIbfGphO\",\n    \"state\": {\n      \"current\": \"CREATED\",\n      \"duration\": 0.058459656,\n      \"histories\": [\n        {\n          \"date\": \"2023-11-28T10:16:22.324536603Z\",\n          \"state\": \"CREATED\"\n        }\n      ],\n      \"startDate\": \"2023-11-28T10:16:22.324536603Z\"\n    },\n    \"flowId\": \"hello-world\",\n    \"taskId\": \"hello\",\n    \"namespace\": \"company.team\",\n    \"executionId\": \"7IBX10Tg3ZzZuNUnLhoXcT\"\n  },\n  \"runContext\": {\n    \"variables\": {\n      \"envs\": {\n        \"plugins_path\": \"/home/loic/dev/kestra-plugins\"\n      },\n      \"flow\": {\n        \"id\": \"hello-world\",\n        \"revision\": 1,\n        \"namespace\": \"company.team\"\n      },\n      \"task\": {\n        \"id\": \"hello\",\n        \"type\": \"io.kestra.plugin.core.log.Log\"\n      },\n      \"taskrun\": {\n        \"id\": \"5PGRX6ve2cztrRSIbfGphO\",\n        \"startDate\": \"2023-11-28T10:16:22.324536603Z\",\n        \"attemptsCount\": 0\n      },\n      \"execution\": {\n        \"id\": \"7IBX10Tg3ZzZuNUnLhoXcT\",\n        \"startDate\": \"2023-11-28T10:16:21.648Z\",\n        \"originalId\": \"7IBX10Tg3ZzZuNUnLhoXcT\"\n      }\n    },\n    \"storageOutputPrefix\": \"///company/team/hello-world/executions/7IBX10Tg3ZzZuNUnLhoXcT/tasks/hello/5PGRX6ve2cztrRSIbfGphO\"\n  }\n}";
    private static final String INVALID_WORKER_TRIGGER_KEY = "dev_http-trigger_http";
    private static final String INVALID_WORKER_TRIGGER_VALUE = "{\n  \"type\": \"trigger\",\n  \"trigger\": {\n    \"id\": \"invalid\",\n    \"type\": \"io.kestra.notfound.Invalid\"\n  },\n  \"triggerContext\": {\n    \"date\": \"2023-11-24T15:48:57.632881597Z\",\n    \"flowId\": \"http-trigger\",\n    \"namespace\": \"dev\",\n    \"triggerId\": \"http\"\n  },\n  \"conditionContext\": {\n    \"flow\": {\n      \"id\": \"http-trigger\",\n      \"tasks\": [\n        {\n          \"id\": \"hello\",\n          \"type\": \"io.kestra.plugin.core.log.Log\",\n          \"message\": \"Kestra team wishes you a great day! \ud83d\udc4b\"\n        }\n      ],\n      \"deleted\": false,\n      \"disabled\": false,\n      \"revision\": 3,\n      \"triggers\": [\n        {\n          \"id\": \"invalid\",\n          \"type\": \"io.kestra.notfound.Invalid\"\n        }\n      ],\n      \"namespace\": \"dev\"\n    },\n    \"runContext\": {\n      \"variables\": {\n        \"envs\": {\n          \"plugins_path\": \"/home/loic/dev/kestra-plugins\"\n        },\n        \"flow\": {\n          \"id\": \"http-trigger\",\n          \"revision\": 3,\n          \"namespace\": \"dev\"\n        },\n        \"trigger\": {\n          \"id\": \"invalid\",\n          \"type\": \"io.kestra.notfound.Invalid\"\n        }\n      }\n    }\n  }\n}\n";
    private static final String INVALID_FLOW_KEY = "company.team_hello-world_2";
    private static final String INVALID_FLOW_VALUE = "{\n  \"id\": \"hello-world\",\n  \"tasks\": [\n    {\n      \"id\": \"invalid\",\n      \"type\": \"io.kestra.notfound.Invalid\"\n    }\n  ],\n  \"deleted\": false,\n  \"disabled\": false,\n  \"revision\": 2,\n  \"namespace\": \"company.team\"\n}\n";
    public static final String INVALID_SUBFLOW_EXECUTION_KEY = "1XKpihp8y2m3KEHR0hVEKN";
    public static final String INVALID_SUBFLOW_EXECUTION_VALUE = "{\n  \"execution\": {\n    \"id\": \"1XKpihp8y2m3KEHR0hVEKN\",\n    \"state\": {\n      \"current\": \"CREATED\",\n      \"duration\": 0.000201173,\n      \"histories\": [\n        {\n          \"date\": \"2024-01-10T13:48:32.752Z\",\n          \"state\": \"CREATED\"\n        }\n      ],\n      \"startDate\": \"2024-01-10T13:48:32.752Z\"\n    },\n    \"flowId\": \"hello-world\",\n    \"deleted\": false,\n    \"trigger\": {\n      \"id\": \"subflow\",\n      \"type\": \"io.kestra.notfound.Invalid\",\n      \"variables\": {\n        \"flowId\": \"subflox\",\n        \"namespace\": \"company.team\",\n        \"executionId\": \"4NzSyOQBYj1CxVg3bTghbZ\",\n        \"flowRevision\": 1\n      }\n    },\n    \"namespace\": \"company.team\",\n    \"originalId\": \"1XKpihp8y2m3KEHR0hVEKN\",\n    \"flowRevision\": 2\n  },\n  \"parentTask\": {\n    \"id\": \"subflow\",\n    \"type\": \"io.kestra.notfound.Invalid\"\n  },\n  \"parentTaskRun\": {\n    \"id\": \"6Gc6Dkk7medsWtg1WJfZpN\",\n    \"state\": {\n      \"current\": \"RUNNING\",\n      \"duration\": 0.039446974,\n      \"histories\": [\n        {\n          \"date\": \"2024-01-10T13:48:32.713Z\",\n          \"state\": \"CREATED\"\n        },\n        {\n          \"date\": \"2024-01-10T13:48:32.752Z\",\n          \"state\": \"RUNNING\"\n        }\n      ],\n      \"startDate\": \"2024-01-10T13:48:32.713Z\"\n    },\n    \"flowId\": \"subflox\",\n    \"taskId\": \"subflow\",\n    \"namespace\": \"company.team\",\n    \"executionId\": \"4NzSyOQBYj1CxVg3bTghbZ\"\n  }\n}\n";
    @Inject
    @Named(value="workerTaskResultQueue")
    protected QueueInterface<WorkerTaskResult> workerTaskResultQueue;
    @Inject
    @Named(value="workerTriggerResultQueue")
    protected QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
    @Inject
    private FlowListenersInterface flowListeners;

    public void workerTaskDeserializationIssue(Consumer<QueueMessage> sendToQueue) throws TimeoutException, QueueException {
        AtomicReference workerTaskResult = new AtomicReference();
        Flux receive = TestsUtils.receive(this.workerTaskResultQueue, either -> {
            if (either != null) {
                workerTaskResult.set((WorkerTaskResult)either.getLeft());
            }
        });
        sendToQueue.accept(new QueueMessage(WorkerJob.class, INVALID_WORKER_TASK_KEY, INVALID_WORKER_TASK_VALUE));
        Await.until(() -> workerTaskResult.get() != null && ((WorkerTaskResult)workerTaskResult.get()).getTaskRun().getState().isTerminated(), (Duration)Duration.ofMillis(100L), (Duration)Duration.ofMinutes(1L));
        receive.blockLast();
        Assertions.assertThat((int)((WorkerTaskResult)workerTaskResult.get()).getTaskRun().getState().getHistories().size()).isEqualTo(2);
        Assertions.assertThat((Comparable)((State.History)((WorkerTaskResult)workerTaskResult.get()).getTaskRun().getState().getHistories().getFirst()).getState()).isEqualTo((Object)State.Type.CREATED);
        Assertions.assertThat((Comparable)((WorkerTaskResult)workerTaskResult.get()).getTaskRun().getState().getCurrent()).isEqualTo((Object)State.Type.FAILED);
    }

    public void workerTriggerDeserializationIssue(Consumer<QueueMessage> sendToQueue) throws TimeoutException, QueueException {
        AtomicReference workerTriggerResult = new AtomicReference();
        Flux receive = TestsUtils.receive(this.workerTriggerResultQueue, either -> {
            if (either != null) {
                workerTriggerResult.set((WorkerTriggerResult)either.getLeft());
            }
        });
        sendToQueue.accept(new QueueMessage(WorkerJob.class, INVALID_WORKER_TRIGGER_KEY, INVALID_WORKER_TRIGGER_VALUE));
        Await.until(() -> workerTriggerResult.get() != null, (Duration)Duration.ofMillis(100L), (Duration)Duration.ofMinutes(1L));
        receive.blockLast();
    }

    public void flowDeserializationIssue(Consumer<QueueMessage> sendToQueue) throws Exception {
        AtomicReference flows = new AtomicReference();
        this.flowListeners.listen(flows::set);
        sendToQueue.accept(new QueueMessage(FlowInterface.class, INVALID_FLOW_KEY, INVALID_FLOW_VALUE));
        Await.until(() -> flows.get() != null && ((List)flows.get()).stream().anyMatch(newFlow -> newFlow.uid().equals(INVALID_FLOW_KEY)), (Duration)Duration.ofMillis(100L), (Duration)Duration.ofMinutes(1L));
    }

    public record QueueMessage(Class<?> type, String key, String value) {
    }
}

