/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.services;

import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.core.utils.Rethrow;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import reactor.core.publisher.Flux;

@KestraTest(startRunner=true)
@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
class ConcurrencyLimitServiceTest {
    private static final String TESTS_FLOW_NS = "io.kestra.tests";
    private static final String TENANT_ID = "main";
    private static final String CONCURRENCY_LIMIT_SERVICE_TEST_UNQUEUE_EXECUTION_TENANT = "concurrency_limit_service_test_unqueue_execution_tenant";
    @Inject
    private RunnerUtils runnerUtils;
    @Inject
    @Named(value="executionQueue")
    private QueueInterface<Execution> executionQueue;
    @Inject
    private FlowRepositoryInterface flowRepositoryInterface;
    @Inject
    private ConcurrencyLimitService concurrencyLimitService;

    ConcurrencyLimitServiceTest() {
    }

    @AfterEach
    void tearDown() {
        this.concurrencyLimitService.find(TENANT_ID).forEach(limit -> this.concurrencyLimitService.update(limit.withRunning(Integer.valueOf(0))));
    }

    @Test
    @LoadFlows(value={"flows/valids/flow-concurrency-queue.yml"}, tenantId="concurrency_limit_service_test_unqueue_execution_tenant")
    void unqueueExecution() throws QueueException, TimeoutException, InterruptedException {
        Execution first = this.runnerUtils.runOneUntilRunning(CONCURRENCY_LIMIT_SERVICE_TEST_UNQUEUE_EXECUTION_TENANT, TESTS_FLOW_NS, "flow-concurrency-queue");
        Execution result = this.runUntilQueued(CONCURRENCY_LIMIT_SERVICE_TEST_UNQUEUE_EXECUTION_TENANT, TESTS_FLOW_NS, "flow-concurrency-queue");
        Assertions.assertThat((boolean)result.getState().isQueued()).isTrue();
        CountDownLatch terminated = new CountDownLatch(2);
        Flux receive = TestsUtils.receive(this.executionQueue, either -> {
            if (((Execution)either.getLeft()).getId().equals(first.getId()) && ((Execution)either.getLeft()).getState().isTerminated()) {
                terminated.countDown();
            }
            if (((Execution)either.getLeft()).getId().equals(result.getId()) && ((Execution)either.getLeft()).getState().isTerminated()) {
                terminated.countDown();
            }
        });
        Execution unqueued = this.concurrencyLimitService.unqueue(result, State.Type.RUNNING);
        Assertions.assertThat((boolean)unqueued.getState().isRunning()).isTrue();
        this.executionQueue.emit((Object)unqueued);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)terminated.await(10L, TimeUnit.SECONDS));
        receive.blockLast();
    }

    @Test
    @ExecuteFlow(value="flows/valids/flow-concurrency-queue.yml", tenantId="concurrency_limit_service_test_find_by_id_tenant")
    void findById(Execution execution) {
        Optional limit = this.concurrencyLimitService.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId());
        Assertions.assertThat((Optional)limit).isNotEmpty();
        Assertions.assertThat((String)((ConcurrencyLimit)limit.get()).getTenantId()).isEqualTo(execution.getTenantId());
        Assertions.assertThat((String)((ConcurrencyLimit)limit.get()).getNamespace()).isEqualTo(execution.getNamespace());
        Assertions.assertThat((String)((ConcurrencyLimit)limit.get()).getFlowId()).isEqualTo(execution.getFlowId());
    }

    @Test
    @ExecuteFlow(value="flows/valids/flow-concurrency-queue.yml", tenantId="concurrency_limit_service_test_update_tenant")
    void update(Execution execution) {
        Optional limit = this.concurrencyLimitService.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId());
        Assertions.assertThat((Optional)limit).isNotEmpty();
        ConcurrencyLimit updated = ((ConcurrencyLimit)limit.get()).withRunning(Integer.valueOf(99));
        this.concurrencyLimitService.update(updated);
        limit = this.concurrencyLimitService.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId());
        Assertions.assertThat((Optional)limit).isNotEmpty();
        Assertions.assertThat((Integer)((ConcurrencyLimit)limit.get()).getRunning()).isEqualTo(99);
    }

    @Test
    @ExecuteFlow(value="flows/valids/flow-concurrency-queue.yml", tenantId="concurrency_limit_service_test_list_tenant")
    void list(Execution execution) {
        List list = this.concurrencyLimitService.find(execution.getTenantId());
        Assertions.assertThat((List)list).isNotEmpty();
        Assertions.assertThat((String)((ConcurrencyLimit)list.getFirst()).getTenantId()).isEqualTo(execution.getTenantId());
        Assertions.assertThat((String)((ConcurrencyLimit)list.getFirst()).getNamespace()).isEqualTo(execution.getNamespace());
        Assertions.assertThat((String)((ConcurrencyLimit)list.getFirst()).getFlowId()).isEqualTo(execution.getFlowId());
    }

    private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
        return this.runUntilQueued(TENANT_ID, namespace, flowId);
    }

    private Execution runUntilQueued(String tenantId, String namespace, String flowId) throws TimeoutException, QueueException {
        return this.runUntilState(tenantId, namespace, flowId, State.Type.QUEUED);
    }

    private Execution runUntilState(String tenantId, String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
        Execution execution = this.createExecution(tenantId, namespace, flowId);
        return this.runnerUtils.awaitExecution(it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state, Rethrow.throwRunnable(() -> this.executionQueue.emit((Object)execution)), Duration.ofSeconds(1L));
    }

    private Execution createExecution(String tenantId, String namespace, String flowId) {
        Flow flow = (Flow)this.flowRepositoryInterface.findById(tenantId, namespace, flowId).orElseThrow();
        return Execution.newExecution((FlowInterface)flow, null);
    }
}

