/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.schedulers;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class DefaultScheduler
extends AbstractScheduler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultScheduler.class);
    private final Map<String, Trigger> watchingTrigger = new ConcurrentHashMap<String, Trigger>();
    private final ConditionService conditionService;
    private final FlowRepositoryInterface flowRepository;

    @Inject
    public DefaultScheduler(ApplicationContext applicationContext, FlowListenersInterface flowListeners, SchedulerTriggerStateInterface triggerState) {
        super(applicationContext, flowListeners);
        this.triggerState = triggerState;
        this.isReady = true;
        this.conditionService = (ConditionService)applicationContext.getBean(ConditionService.class);
        this.flowRepository = (FlowRepositoryInterface)applicationContext.getBean(FlowRepositoryInterface.class);
    }

    @Override
    public void run() {
        QueueInterface executionQueue = (QueueInterface)this.applicationContext.getBean(QueueInterface.class, Qualifiers.byName((String)"executionQueue"));
        QueueInterface triggerQueue = (QueueInterface)this.applicationContext.getBean(QueueInterface.class, Qualifiers.byName((String)"triggerQueue"));
        executionQueue.receive(either -> {
            if (either.isRight()) {
                log.error("Unable to deserialize and execution: {}", (Object)((DeserializationException)either.getRight()).getMessage());
                return;
            }
            Execution execution = (Execution)either.getLeft();
            if (execution.getTrigger() != null) {
                Trigger trigger = Await.until(() -> this.watchingTrigger.get(execution.getId()), Duration.ofSeconds(5L));
                Flow flow = this.flowRepository.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId()).orElse(null);
                if (execution.isDeleted() || this.conditionService.isTerminatedWithListeners(flow, execution)) {
                    this.triggerState.save(trigger.resetExecution());
                    this.watchingTrigger.remove(execution.getId());
                } else {
                    this.triggerState.save(Trigger.of(execution, trigger.getDate()));
                }
            }
        });
        triggerQueue.receive(either -> {
            if (either.isRight()) {
                log.error("Unable to deserialize a trigger: {}", (Object)((DeserializationException)either.getRight()).getMessage());
                return;
            }
            Trigger trigger = (Trigger)either.getLeft();
            if (trigger != null && trigger.getExecutionId() != null) {
                this.watchingTrigger.put(trigger.getExecutionId(), trigger);
            }
        });
        super.run();
    }
}

