/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc.runner;

import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.server.AbstractServiceLivenessCoordinator;
import io.kestra.core.server.ServerConfig;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceLivenessStore;
import io.kestra.core.server.ServiceRegistry;
import io.kestra.core.server.ServiceType;
import io.kestra.core.server.WorkerTaskRestartStrategy;
import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository;
import io.kestra.jdbc.runner.JdbcExecutor;
import io.kestra.jdbc.runner.JdbcRunnerEnabled;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@JdbcRunnerEnabled
@Requires(property="kestra.server-type", pattern="(EXECUTOR|STANDALONE)")
public final class JdbcServiceLivenessCoordinator
extends AbstractServiceLivenessCoordinator {
    private static final Logger log = LoggerFactory.getLogger(JdbcServiceLivenessCoordinator.class);
    private final AtomicReference<JdbcExecutor> executor = new AtomicReference();
    private final AbstractJdbcServiceInstanceRepository serviceInstanceRepository;
    private final Duration purgeRetention;

    @Inject
    public JdbcServiceLivenessCoordinator(AbstractJdbcServiceInstanceRepository serviceInstanceRepository, ServiceRegistry serviceRegistry, ServerConfig serverConfig, @Value(value="${kestra.server.service.purge.retention}") Duration purgeRetention) {
        super((ServiceLivenessStore)serviceInstanceRepository, serviceRegistry, serverConfig);
        this.serviceInstanceRepository = serviceInstanceRepository;
        this.purgeRetention = purgeRetention;
    }

    protected void onSchedule(Instant now) throws Exception {
        if (this.executor.get() == null) {
            return;
        }
        super.onSchedule(now);
    }

    protected void handleAllWorkersForUncleanShutdown(Instant now) {
        this.serviceInstanceRepository.transaction(configuration -> {
            List<String> ids;
            List<ServiceInstance> nonRunningWorkers = this.serviceInstanceRepository.findAllNonRunningInstances(configuration, true).stream().filter(instance -> instance.is(ServiceType.WORKER)).toList();
            List uncleanShutdownWorkers = this.filterAllUncleanShutdownServices(nonRunningWorkers, now);
            if (!uncleanShutdownWorkers.isEmpty() && !(ids = uncleanShutdownWorkers.stream().filter(instance -> instance.config().workerTaskRestartStrategy().isRestartable()).map(ServiceInstance::uid).toList()).isEmpty()) {
                log.info("Trigger task restart for non-responding workers after termination grace period: {}.", ids);
                this.executor.get().reEmitWorkerJobsForWorkers(configuration, ids);
            }
            Stream<ServiceInstance> cleanShutdownWorkers = nonRunningWorkers.stream().filter(nonRunning -> nonRunning.is(Service.ServiceState.TERMINATED_GRACEFULLY));
            Stream.concat(cleanShutdownWorkers, uncleanShutdownWorkers.stream()).forEach(instance -> this.serviceInstanceRepository.mayTransitServiceTo(configuration, (ServiceInstance)instance, Service.ServiceState.NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING));
        });
    }

    protected void update(ServiceInstance instance, Service.ServiceState state, String reason) {
        this.serviceInstanceRepository.update(instance, state, reason);
    }

    protected void handleAllNonRespondingServices(Instant now) {
        this.serviceInstanceRepository.transaction(configuration -> {
            List<ServiceInstance> allRunningInstances = this.serviceInstanceRepository.findAllInstancesInStates(configuration, Service.ServiceState.allRunningStates(), true);
            List nonRespondingServices = this.filterAllNonRespondingServices(allRunningInstances, now);
            nonRespondingServices.forEach(instance -> this.serviceInstanceRepository.mayTransitServiceTo(configuration, (ServiceInstance)instance, Service.ServiceState.DISCONNECTED, DEFAULT_REASON_FOR_DISCONNECTED));
            List<String> workerIdsHavingTasksToRestart = nonRespondingServices.stream().filter(instance -> instance.is(ServiceType.WORKER)).filter(instance -> instance.config().workerTaskRestartStrategy().equals((Object)WorkerTaskRestartStrategy.IMMEDIATELY)).map(ServiceInstance::uid).toList();
            if (!workerIdsHavingTasksToRestart.isEmpty()) {
                log.info("Trigger task restart for non-responding workers after timeout: {}.", workerIdsHavingTasksToRestart);
                this.executor.get().reEmitWorkerJobsForWorkers(configuration, workerIdsHavingTasksToRestart);
            }
        });
    }

    @Scheduled(initialDelay="${kestra.server.service.purge.initial-delay}", fixedDelay="${kestra.server.service.purge.fixed-delay}")
    public void purgeEmptyInstances() {
        int purged = this.serviceInstanceRepository.purgeEmptyInstances(Instant.now().minus(this.purgeRetention));
        log.info("Purged {} service instances", (Object)purged);
    }

    synchronized void setExecutor(JdbcExecutor executor) {
        this.executor.set(executor);
    }

    @VisibleForTesting
    void setServerInstance(String serverId) {
        this.serverId = serverId;
    }
}

