/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.runners;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.property.Property;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@KestraTest
public abstract class FlowListenersTest {
    @Inject
    protected FlowRepositoryInterface flowRepository;
    private static final Logger LOG = LoggerFactory.getLogger(FlowListenersTest.class);

    protected static FlowWithSource create(String tenantId, String flowId, String taskId) {
        FlowWithSource flow = ((FlowWithSource.FlowWithSourceBuilder)((FlowWithSource.FlowWithSourceBuilder)((FlowWithSource.FlowWithSourceBuilder)((FlowWithSource.FlowWithSourceBuilder)((FlowWithSource.FlowWithSourceBuilder)FlowWithSource.builder().id(flowId)).namespace("io.kestra.unittest")).tenantId(tenantId)).revision(Integer.valueOf(1))).tasks(Collections.singletonList(((Return.ReturnBuilder)((Return.ReturnBuilder)Return.builder().id(taskId)).type(Return.class.getName())).format(Property.ofValue((Object)"test")).build()))).build();
        return flow.toBuilder().source(flow.sourceOrGenerateIfNull()).build();
    }

    public void suite(FlowListenersInterface flowListenersService) throws TimeoutException {
        String tenant = TestsUtils.randomTenant((String[])new String[]{this.getClass().getSimpleName()});
        flowListenersService.run();
        AtomicInteger count = new AtomicInteger();
        flowListenersService.listen(flows -> count.set(this.getFlowsForTenant(flowListenersService, tenant).size()));
        LOG.info("-----------> wait for zero");
        Await.until(() -> count.get() == 0, (Duration)Duration.ofMillis(10L), (Duration)Duration.ofSeconds(5L));
        Assertions.assertThat((int)this.getFlowsForTenant(flowListenersService, tenant).size()).isZero();
        LOG.info("-----------> wait for zero kafka");
        if (flowListenersService.getClass().getName().equals("io.kestra.ee.runner.kafka.KafkaFlowListeners")) {
            Await.until(() -> count.get() == 0, (Duration)Duration.ofMillis(10L), (Duration)Duration.ofSeconds(5L));
            Assertions.assertThat((int)this.getFlowsForTenant(flowListenersService, tenant).size()).isZero();
        }
        LOG.info("-----------> create fist flow");
        FlowWithSource first = FlowListenersTest.create(tenant, "first_" + IdUtils.create(), "test");
        FlowWithSource firstUpdated = FlowListenersTest.create(tenant, first.getId(), "test2");
        this.flowRepository.create(GenericFlow.of((FlowInterface)first));
        Await.until(() -> count.get() == 1, (Duration)Duration.ofMillis(10L), (Duration)Duration.ofSeconds(5L));
        Assertions.assertThat((int)this.getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
        first = this.flowRepository.update(GenericFlow.of((FlowInterface)firstUpdated), (FlowInterface)first);
        Await.until(() -> count.get() == 1, (Duration)Duration.ofMillis(10L), (Duration)Duration.ofSeconds(5L));
        Assertions.assertThat((int)this.getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
        FlowWithSource second = FlowListenersTest.create(tenant, "second_" + IdUtils.create(), "test");
        this.flowRepository.create(GenericFlow.of((FlowInterface)second));
        Await.until(() -> count.get() == 2, (Duration)Duration.ofMillis(10L), (Duration)Duration.ofSeconds(5L));
        Assertions.assertThat((int)this.getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
        FlowWithSource deleted = this.flowRepository.delete((FlowInterface)first);
        Await.until(() -> count.get() == 1, (Duration)Duration.ofMillis(10L), (Duration)Duration.ofSeconds(5L));
        Assertions.assertThat((int)this.getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
        this.flowRepository.create(GenericFlow.of((FlowInterface)first));
        Await.until(() -> count.get() == 2, (Duration)Duration.ofMillis(10L), (Duration)Duration.ofSeconds(5L));
        Assertions.assertThat((int)this.getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
    }

    public List<FlowWithSource> getFlowsForTenant(FlowListenersInterface flowListenersService, String tenantId) {
        return flowListenersService.flows().stream().filter(f -> tenantId.equals(f.getTenantId())).toList();
    }
}

