/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.server;

import io.kestra.core.server.AbstractServiceLivenessTask;
import io.kestra.core.server.ServerConfig;
import io.kestra.core.server.ServerInstance;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceLivenessStore;
import io.kestra.core.server.ServiceRegistry;
import io.kestra.core.server.ServiceType;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Introspected
public abstract class AbstractServiceLivenessCoordinator
extends AbstractServiceLivenessTask {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractServiceLivenessCoordinator.class);
    private static final int DEFAULT_SCHEDULE_JITTER_MAX_MS = 500;
    protected static String DEFAULT_REASON_FOR_DISCONNECTED = "The service was detected as non-responsive after the session timeout. Service transitioned to the 'DISCONNECTED' state.";
    protected static String DEFAULT_REASON_FOR_NOT_RUNNING = "The service was detected as non-responsive or terminated after termination grace period. Service transitioned to the 'NOT_RUNNING' state.";
    private static final String TASK_NAME = "service-liveness-coordinator-task";
    protected final ServiceLivenessStore store;
    protected final ServiceRegistry serviceRegistry;
    protected String serverId = ServerInstance.INSTANCE_ID;

    @Inject
    public AbstractServiceLivenessCoordinator(ServiceLivenessStore store, ServiceRegistry serviceRegistry, ServerConfig serverConfig) {
        super(TASK_NAME, serverConfig);
        this.serviceRegistry = serviceRegistry;
        this.store = store;
    }

    @Override
    protected void onSchedule(Instant now) throws Exception {
        if (Optional.ofNullable(this.serviceRegistry.get(ServiceType.EXECUTOR)).filter(service -> service.instance().is(Service.ServiceState.RUNNING)).isEmpty()) {
            log.debug("The liveness coordinator task was temporarily disabled. Executor is not yet in the RUNNING state.");
            return;
        }
        this.handleAllNonRespondingServices(now);
        this.handleAllWorkersForUncleanShutdown(now);
        this.handleAllServicesForTerminatedStates(now);
        this.handleAllServiceInNotRunningState();
        this.maybeDetectAndLogNewConnectedServices();
    }

    protected abstract void handleAllNonRespondingServices(Instant var1);

    protected abstract void handleAllWorkersForUncleanShutdown(Instant var1);

    protected abstract void update(ServiceInstance var1, Service.ServiceState var2, String var3);

    @Override
    protected Duration getScheduleInterval() {
        Random r = new Random();
        int jitter = r.nextInt(500);
        return this.serverConfig.liveness().interval().plus(Duration.ofMillis(jitter));
    }

    protected List<ServiceInstance> filterAllUncleanShutdownServices(List<ServiceInstance> instances, Instant now) {
        ArrayList<ServiceInstance> uncleanShutdownServices = new ArrayList<ServiceInstance>();
        uncleanShutdownServices.addAll(instances.stream().filter(nonRunning -> nonRunning.state().isDisconnectedOrTerminating()).filter(disconnectedOrTerminating -> disconnectedOrTerminating.isTerminationGracePeriodElapsed(now)).peek(instance -> AbstractServiceLivenessCoordinator.maybeLogNonRespondingAfterTerminationGracePeriod(instance, now)).toList());
        uncleanShutdownServices.addAll(instances.stream().filter(nonRunning -> nonRunning.is(Service.ServiceState.TERMINATED_FORCED)).filter(terminated -> terminated.isTerminationGracePeriodElapsed(now)).toList());
        return uncleanShutdownServices;
    }

    protected List<ServiceInstance> filterAllNonRespondingServices(List<ServiceInstance> instances, Instant now) {
        return instances.stream().filter(instance -> Objects.nonNull(instance.config())).filter(instance -> instance.config().liveness().enabled()).filter(instance -> instance.isSessionTimeoutElapsed(now)).filter(instance -> !instance.server().id().equals(this.serverId)).filter(instance -> {
            Instant minInstantForLivenessProbe = now.minus(instance.config().liveness().initialDelay());
            return instance.createdAt().isBefore(minInstantForLivenessProbe);
        }).peek(instance -> log.warn("Detected non-responding service [id={}, type={}, hostname={}] after timeout ({}ms).", new Object[]{instance.uid(), instance.type(), instance.server().hostname(), now.toEpochMilli() - instance.updatedAt().toEpochMilli()})).toList();
    }

    protected void handleAllServiceInNotRunningState() {
        this.store.findAllInstancesInStates(Set.of(Service.ServiceState.NOT_RUNNING)).forEach(instance -> this.safelyUpdate((ServiceInstance)instance, Service.ServiceState.INACTIVE, null));
    }

    protected void handleAllServicesForTerminatedStates(Instant now) {
        this.store.findAllInstancesInStates(Set.of(Service.ServiceState.DISCONNECTED, Service.ServiceState.TERMINATING, Service.ServiceState.TERMINATED_GRACEFULLY, Service.ServiceState.TERMINATED_FORCED)).stream().filter(instance -> !instance.is(ServiceType.WORKER)).filter(instance -> instance.isTerminationGracePeriodElapsed(now)).peek(instance -> AbstractServiceLivenessCoordinator.maybeLogNonRespondingAfterTerminationGracePeriod(instance, now)).forEach(instance -> this.safelyUpdate((ServiceInstance)instance, Service.ServiceState.NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING));
    }

    protected void maybeDetectAndLogNewConnectedServices() {
        if (log.isDebugEnabled()) {
            this.store.findAllInstancesInStates(Set.of(Service.ServiceState.CREATED, Service.ServiceState.RUNNING)).stream().filter(instance -> instance.createdAt().isAfter(this.lastScheduledExecution())).forEach(instance -> log.debug("Detected new service [id={}, type={}, hostname={}] (started at: {}).", new Object[]{instance.uid(), instance.type(), instance.server().hostname(), instance.createdAt()}));
        }
    }

    protected void safelyUpdate(ServiceInstance instance, Service.ServiceState state, String reason) {
        try {
            this.update(instance, state, reason);
        }
        catch (Exception e) {
            log.error("Unexpected error while service [id={}, type={}, hostname={}] transition from {} to {}.", new Object[]{instance.uid(), instance.type(), instance.server().hostname(), instance.state(), state, e});
        }
    }

    protected static void maybeLogNonRespondingAfterTerminationGracePeriod(ServiceInstance instance, Instant now) {
        if (instance.state().isDisconnectedOrTerminating()) {
            log.warn("Detected non-responding service [id={}, type={}, hostname={}] after termination grace period ({}ms).", new Object[]{instance.uid(), instance.type(), instance.server().hostname(), now.toEpochMilli() - instance.updatedAt().toEpochMilli()});
        }
    }
}

