/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.server;

import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.server.AbstractServiceLivenessTask;
import io.kestra.core.server.LocalServiceState;
import io.kestra.core.server.LocalServiceStateFactory;
import io.kestra.core.server.ServerConfig;
import io.kestra.core.server.ServerInstance;
import io.kestra.core.server.ServerInstanceFactory;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceLivenessUpdater;
import io.kestra.core.server.ServiceRegistry;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceStateTransition;
import io.kestra.core.server.ServiceType;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Requires;
import io.micronaut.runtime.event.annotation.EventListener;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Context
@Requires(beans={ServiceLivenessUpdater.class})
public class ServiceLivenessManager
extends AbstractServiceLivenessTask {
    private static final Logger log = LoggerFactory.getLogger(ServiceLivenessManager.class);
    private static final String TASK_NAME = "service-liveness-manager-task";
    private final LocalServiceStateFactory localServiceStateFactory;
    private final ServiceLivenessUpdater serviceLivenessUpdater;
    private final ReentrantLock stateLock = new ReentrantLock();
    protected final OnStateTransitionFailureCallback onStateTransitionFailureCallback;
    private final ServerInstanceFactory serverInstanceFactory;
    private final ServiceRegistry serviceRegistry;
    private Instant lastSucceedStateUpdated;

    @Inject
    public ServiceLivenessManager(ServerConfig configuration, ServiceRegistry serviceRegistry, LocalServiceStateFactory localServiceStateFactory, ServerInstanceFactory serverInstanceFactory, ServiceLivenessUpdater serviceLivenessUpdater) {
        this(configuration, serviceRegistry, localServiceStateFactory, serverInstanceFactory, serviceLivenessUpdater, new DefaultStateTransitionFailureCallback());
    }

    @VisibleForTesting
    public ServiceLivenessManager(ServerConfig configuration, ServiceRegistry serviceRegistry, LocalServiceStateFactory localServiceStateFactory, ServerInstanceFactory serverInstanceFactory, ServiceLivenessUpdater serviceLivenessUpdater, OnStateTransitionFailureCallback onStateTransitionFailureCallback) {
        super(TASK_NAME, configuration);
        this.serviceRegistry = serviceRegistry;
        this.localServiceStateFactory = localServiceStateFactory;
        this.serverInstanceFactory = serverInstanceFactory;
        this.serviceLivenessUpdater = serviceLivenessUpdater;
        this.onStateTransitionFailureCallback = onStateTransitionFailureCallback;
    }

    @EventListener
    public void onServiceStateChangeEvent(ServiceStateChangeEvent event) {
        Service.ServiceState newState = event.getService().getState();
        if (newState == null) {
            return;
        }
        LocalServiceState holder = this.serviceRegistry.get(event.getService().getType());
        if (holder != null && !holder.isStateUpdatable().get()) {
            ServiceInstance instance = holder.instance();
            log.debug("[Service id={}, type={}, hostname={}] Service state is not updatable. StateChangeEvent[{}] skipped.", new Object[]{instance.uid(), instance.type(), instance.server().hostname(), instance.state()});
            return;
        }
        switch (newState) {
            case CREATED: {
                this.onCreateState(event);
                break;
            }
            case RUNNING: 
            case TERMINATING: 
            case TERMINATED_GRACEFULLY: 
            case TERMINATED_FORCED: 
            case MAINTENANCE: {
                this.updateServiceInstanceState(Instant.now(), event.getService(), newState, OnStateTransitionFailureCallback.NOOP);
                break;
            }
            default: {
                log.warn("Unsupported service state: {}. Ignored.", (Object)newState);
            }
        }
    }

    private void onCreateState(ServiceStateChangeEvent event) {
        Service service = event.getService();
        LocalServiceState localServiceState = this.localServiceStateFactory.newLocalServiceState(service, event.properties());
        ServiceInstance instance = localServiceState.instance();
        this.serviceLivenessUpdater.update(instance);
        this.serviceRegistry.register(localServiceState.with(instance));
        if (log.isDebugEnabled()) {
            log.debug("[Service id={}, type='{}', hostname='{}'] Connected.", new Object[]{instance.uid(), instance.type(), instance.server().hostname()});
        }
    }

    @Override
    protected Duration getScheduleInterval() {
        return this.serverConfig.liveness().heartbeatInterval();
    }

    @Override
    protected void onSchedule(Instant now) {
        if (this.serviceRegistry.isEmpty()) {
            log.trace("No service registered yet. Skip service state update.");
            return;
        }
        this.serviceRegistry.all().stream().filter(localServiceState -> localServiceState.isStateUpdatable().get()).forEach(localServiceState -> {
            long start = System.currentTimeMillis();
            Service service = localServiceState.service();
            if (!this.beforeScheduledStateUpdate(now, service, localServiceState.instance())) {
                return;
            }
            ServiceInstance instance = this.updateServiceInstanceState(now, service, null, this.onStateTransitionFailureCallback);
            if (log.isTraceEnabled() && instance != null) {
                log.trace("[Service id={}, type={}, hostname='{}'] Completed scheduled state update: '{}' ({}ms).", new Object[]{instance.uid(), instance.type(), instance.server().hostname(), instance.state(), System.currentTimeMillis() - start});
            }
        });
    }

    private boolean beforeScheduledStateUpdate(Instant now, Service service, ServiceInstance instance) {
        if (this.isLivenessEnabled().booleanValue() && this.isWorkerServer() && this.isServerDisconnected(now)) {
            log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state before reaching timeout ({}ms). Disconnecting.", new Object[]{instance.uid(), instance.type(), instance.server().hostname(), this.getElapsedMilliSinceLastStateUpdate(now)});
            ServiceInstance updated = this.updateServiceInstanceState(now, service, Service.ServiceState.DISCONNECTED, OnStateTransitionFailureCallback.NOOP);
            if (updated != null) {
                this.onStateTransitionFailureCallback.execute(now, service, updated, true);
            }
            return false;
        }
        return true;
    }

    protected Instant lastSucceedStateUpdated() {
        return this.lastSucceedStateUpdated;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ServiceInstance updateServiceInstanceState(Instant now, Service service, @Nullable Service.ServiceState newState, OnStateTransitionFailureCallback onStateChangeError) {
        ServiceInstance localInstance;
        LocalServiceState localServiceState = this.localServiceState(service);
        if (localServiceState == null) {
            return null;
        }
        if (newState != null && !(localInstance = localServiceState.instance()).state().isValidTransition(newState)) {
            log.warn("Failed to transition service [id={}, type={}, hostname={}] from {} to {}. Cause: {}.", new Object[]{localInstance.uid(), localInstance.type(), localInstance.server().hostname(), localInstance.state(), newState, "Invalid transition"});
            this.mayDisableStateUpdate(service, localInstance);
            return localInstance;
        }
        this.stateLock.lock();
        Runnable returnCallback = null;
        localServiceState = this.localServiceState(service);
        try {
            if (localServiceState == null) {
                ServiceInstance serviceInstance = null;
                return serviceInstance;
            }
            if (newState == null) {
                newState = localServiceState.instance().state();
            }
            ServiceInstance localInstance2 = localServiceState.instance().metrics(localServiceState.service().getMetrics()).server(this.serverInstanceFactory.newServerInstance());
            ServiceStateTransition.Response response = this.serviceLivenessUpdater.update(localInstance2, newState);
            ServiceInstance remoteInstance = response.instance();
            boolean isStateTransitionSucceed = response.is(ServiceStateTransition.Result.SUCCEEDED);
            if (response.is(ServiceStateTransition.Result.ABORTED)) {
                remoteInstance = localInstance2.state(newState, now);
                this.serviceLivenessUpdater.update(remoteInstance);
                isStateTransitionSucceed = true;
            }
            if (response.is(ServiceStateTransition.Result.FAILED)) {
                this.mayDisableStateUpdate(service, remoteInstance);
                ServiceInstance instance = remoteInstance;
                returnCallback = () -> {
                    Optional<ServiceInstance> result = onStateChangeError.execute(now, service, instance, this.isLivenessEnabled());
                    if (result.isPresent()) {
                        ServiceInstance recovered = result.get();
                        this.serviceLivenessUpdater.update(recovered);
                        this.serviceRegistry.register(this.localServiceState(service).with(recovered));
                        this.lastSucceedStateUpdated = now;
                    }
                };
            }
            if (isStateTransitionSucceed) {
                this.lastSucceedStateUpdated = now;
            }
            this.serviceRegistry.register(localServiceState.with(remoteInstance));
        }
        catch (Exception e) {
            ServiceInstance localInstance3 = localServiceState.instance();
            log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state to {}. Error: {}", new Object[]{localInstance3.uid(), localInstance3.type(), localInstance3.server().hostname(), newState.name(), e.getMessage()});
        }
        finally {
            this.stateLock.unlock();
            if (returnCallback != null) {
                returnCallback.run();
            }
        }
        return Optional.ofNullable(this.localServiceState(service)).map(LocalServiceState::instance).orElse(null);
    }

    private void mayDisableStateUpdate(Service service, ServiceInstance instance) {
        Service.ServiceState actualState = instance.state();
        if (actualState.hasCompletedTermination()) {
            log.error("[Service id={}, type={}, hostname={}] Termination already completed ({}). This error may occur if the service has already been evicted by Kestra due to a prior error.", new Object[]{instance.uid(), instance.type(), instance.server().hostname(), actualState});
            this.localServiceState(service).isStateUpdatable().set(false);
        }
    }

    private LocalServiceState localServiceState(Service service) {
        return this.serviceRegistry.get(service.getType());
    }

    @VisibleForTesting
    public List<ServiceInstance> allServiceInstances() {
        return this.serviceRegistry.all().stream().map(LocalServiceState::instance).toList();
    }

    @VisibleForTesting
    public void updateServiceInstance(Service service, ServiceInstance instance) {
        this.serviceRegistry.register(new LocalServiceState(service, instance));
    }

    private boolean isWorkerServer() {
        return KestraContext.getContext().getServerType().equals((Object)ServerType.WORKER);
    }

    private boolean isServerDisconnected(Instant now) {
        long timeoutMilli = this.serverConfig.liveness().timeout().toMillis();
        return this.getElapsedMilliSinceLastSchedule(now) < timeoutMilli && this.getElapsedMilliSinceLastStateUpdate(now) > timeoutMilli;
    }

    private long getElapsedMilliSinceLastStateUpdate(Instant now) {
        return now.toEpochMilli() - (this.lastSucceedStateUpdated() != null ? this.lastSucceedStateUpdated().toEpochMilli() : now.toEpochMilli());
    }

    @Override
    @PreDestroy
    public void close() {
        List<LocalServiceState> states = this.serviceRegistry.all();
        for (LocalServiceState state : states) {
            Service service = state.service();
            try {
                Object unwrapped = service.unwrap();
                unwrapped.close();
                this.serviceRegistry.unregister(state);
            }
            catch (Exception e) {
                log.error("[Service id={}, type={}] Unexpected error on close", new Object[]{service.getId(), service.getType(), e});
            }
        }
        super.close();
    }

    public static final class DefaultStateTransitionFailureCallback
    implements OnStateTransitionFailureCallback {
        @Override
        public Optional<ServiceInstance> execute(Instant now, Service service, ServiceInstance instance, boolean isLivenessEnabled) {
            if (ServerInstance.Type.STANDALONE.equals((Object)instance.server().type()) || instance.is(ServiceType.INDEXER) || instance.is(ServiceType.WEBSERVER)) {
                return Optional.of(instance.state(Service.ServiceState.RUNNING, now, null));
            }
            if (isLivenessEnabled || instance.is(Service.ServiceState.ERROR)) {
                log.error("[Service id={}, type={}, hostname='{}'] Terminating server.", new Object[]{instance.uid(), instance.type(), instance.server().hostname()});
                Service.ServiceState state = instance.state();
                if (state.equals((Object)Service.ServiceState.NOT_RUNNING) || state.equals((Object)Service.ServiceState.INACTIVE)) {
                    service.skipGracefulTermination(true);
                }
                KestraContext.getContext().shutdown();
                return Optional.empty();
            }
            log.warn("[Service id={}, type={}, hostname='{}'] Received unexpected state [{}] transition error [bug].", new Object[]{instance.uid(), instance.type(), instance.server().hostname(), instance.state()});
            return Optional.empty();
        }
    }

    @FunctionalInterface
    public static interface OnStateTransitionFailureCallback {
        public static final OnStateTransitionFailureCallback NOOP = (now, service, instance, isLivenessEnabled) -> Optional.empty();

        public Optional<ServiceInstance> execute(Instant var1, Service var2, ServiceInstance var3, boolean var4);
    }
}

