/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.agent.apis.rpc.v4.endpoints;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.genie.common.exceptions.GenieServerException;
import com.netflix.genie.common.internal.dtos.JobStatus;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieInvalidStatusException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobNotFoundException;
import com.netflix.genie.proto.JobKillRegistrationRequest;
import com.netflix.genie.proto.JobKillRegistrationResponse;
import com.netflix.genie.proto.JobKillServiceGrpc;
import com.netflix.genie.web.agent.services.AgentRoutingService;
import com.netflix.genie.web.data.services.DataServices;
import com.netflix.genie.web.data.services.PersistenceService;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.services.JobKillService;
import com.netflix.genie.web.services.RequestForwardingService;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Scheduled;

public class GRpcJobKillServiceImpl
extends JobKillServiceGrpc.JobKillServiceImplBase
implements JobKillService {
    private static final Logger log = LoggerFactory.getLogger(GRpcJobKillServiceImpl.class);
    @VisibleForTesting
    private final Map<String, StreamObserver<JobKillRegistrationResponse>> parkedJobKillResponseObservers;
    private final PersistenceService persistenceService;
    private final AgentRoutingService agentRoutingService;
    private final RequestForwardingService requestForwardingService;

    public GRpcJobKillServiceImpl(DataServices dataServices, AgentRoutingService agentRoutingService, RequestForwardingService requestForwardingService) {
        this.persistenceService = dataServices.getPersistenceService();
        this.parkedJobKillResponseObservers = new ConcurrentHashMap<String, StreamObserver<JobKillRegistrationResponse>>();
        this.agentRoutingService = agentRoutingService;
        this.requestForwardingService = requestForwardingService;
    }

    public void registerForKillNotification(JobKillRegistrationRequest request, StreamObserver<JobKillRegistrationResponse> responseObserver) {
        StreamObserver<JobKillRegistrationResponse> existingObserver = this.parkedJobKillResponseObservers.put(request.getJobId(), responseObserver);
        if (existingObserver != null) {
            existingObserver.onCompleted();
        }
    }

    @Override
    @Retryable(value={GenieInvalidStatusException.class, GenieServerException.class}, backoff=@Backoff(delay=1000L))
    public void killJob(String jobId, String reason, @Nullable HttpServletRequest request) throws GenieJobNotFoundException, GenieServerException {
        JobStatus currentJobStatus;
        try {
            currentJobStatus = this.persistenceService.getJobStatus(jobId);
        }
        catch (NotFoundException e) {
            throw new GenieJobNotFoundException((Throwable)((Object)e));
        }
        if (currentJobStatus.isFinished()) {
            log.info("Job {} was already finished when the kill request arrived. Nothing to do.", (Object)jobId);
        } else if (JobStatus.getStatusesBeforeClaimed().contains(currentJobStatus)) {
            try {
                this.persistenceService.updateJobStatus(jobId, currentJobStatus, JobStatus.KILLED, reason);
            }
            catch (GenieInvalidStatusException e) {
                log.error("Unable to set job status for {} to {} due to current status not being expected {}", new Object[]{jobId, JobStatus.KILLED, currentJobStatus});
                throw e;
            }
            catch (NotFoundException e) {
                throw new GenieJobNotFoundException((Throwable)((Object)e));
            }
        } else if (currentJobStatus.isActive()) {
            if (this.agentRoutingService.isAgentConnectionLocal(jobId)) {
                StreamObserver<JobKillRegistrationResponse> responseObserver = this.parkedJobKillResponseObservers.remove(jobId);
                if (responseObserver == null) {
                    log.warn("Tried to kill Job {}, but expected local agent connection not found. Trying to force updating the job status to {}", (Object)jobId, (Object)JobStatus.KILLED);
                    try {
                        this.persistenceService.updateJobStatus(jobId, currentJobStatus, JobStatus.KILLED, reason);
                        log.info("Succeeded to force updating the status of Job {} to {}", (Object)jobId, (Object)JobStatus.KILLED);
                    }
                    catch (GenieInvalidStatusException e) {
                        log.error("Failed to force updating the status of Job {} to {} due to current status not being expected {}", new Object[]{jobId, JobStatus.KILLED, currentJobStatus});
                        throw e;
                    }
                    catch (NotFoundException e) {
                        log.error("Failed to force updating the status of Job {} to {} due to job not found", (Object)jobId, (Object)JobStatus.KILLED);
                        throw new GenieJobNotFoundException((Throwable)((Object)e));
                    }
                } else {
                    responseObserver.onNext((Object)JobKillRegistrationResponse.newBuilder().build());
                    responseObserver.onCompleted();
                    log.info("Agent notified for killing job {}", (Object)jobId);
                }
            } else {
                String hostname = this.agentRoutingService.getHostnameForAgentConnection(jobId).orElseThrow(() -> new GenieServerException("Unable to locate host where agent is connected for job " + jobId));
                log.info("Agent for job {} currently connected to {}. Attempting to forward kill request", (Object)jobId, (Object)hostname);
                this.requestForwardingService.kill(hostname, jobId, request);
            }
        } else {
            log.error("{} is an unhandled state for job {}", (Object)currentJobStatus, (Object)jobId);
            throw new GenieServerException("Job " + jobId + " is currently in " + currentJobStatus + " status, which isn't currently handled");
        }
    }

    @Scheduled(fixedDelay=30000L, initialDelay=30000L)
    public void cleanupOrphanedObservers() {
        for (String jobId : this.parkedJobKillResponseObservers.keySet()) {
            try {
                if (this.agentRoutingService.isAgentConnectionLocal(jobId)) continue;
                StreamObserver<JobKillRegistrationResponse> observer = this.parkedJobKillResponseObservers.remove(jobId);
                this.cancelObserverIfNecessary(observer);
            }
            catch (Exception unexpectedException) {
                log.error("Got unexpected exception while trying to cleanup jobID {}. Moving on. Exception: {}", (Object)jobId, (Object)unexpectedException);
            }
        }
    }

    @VisibleForTesting
    protected boolean isStreamObserverCancelled(StreamObserver<JobKillRegistrationResponse> observer) {
        return ((ServerCallStreamObserver)observer).isCancelled();
    }

    private void cancelObserverIfNecessary(StreamObserver<JobKillRegistrationResponse> observer) {
        if (observer != null && !this.isStreamObserverCancelled(observer)) {
            try {
                observer.onCompleted();
            }
            catch (Exception observerException) {
                log.error("Got exception while trying to complete streamObserver during cleanupfor jobID {}. Exception: {}", (Object)"jobId", (Object)observerException);
            }
        }
    }

    Map<String, StreamObserver<JobKillRegistrationResponse>> getParkedJobKillResponseObservers() {
        return this.parkedJobKillResponseObservers;
    }
}

