/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.rpc.grpc.services.impl.v4;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.netflix.genie.common.internal.dto.v4.files.JobFileState;
import com.netflix.genie.proto.BeginAcknowledgement;
import com.netflix.genie.proto.BeginSync;
import com.netflix.genie.proto.DataUpload;
import com.netflix.genie.proto.DeleteFile;
import com.netflix.genie.proto.JobDirectoryState;
import com.netflix.genie.proto.JobFileSyncServiceGrpc;
import com.netflix.genie.proto.ResetSync;
import com.netflix.genie.proto.SyncAcknowledgement;
import com.netflix.genie.proto.SyncComplete;
import com.netflix.genie.proto.SyncRequest;
import com.netflix.genie.proto.SyncRequestResult;
import com.netflix.genie.proto.SyncResponse;
import com.netflix.genie.web.properties.JobFileSyncRpcProperties;
import com.netflix.genie.web.services.JobFileService;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import net.devh.springboot.autoconfigure.grpc.server.GrpcService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.TaskScheduler;

@ConditionalOnProperty(value={"genie.grpc.server.enabled"}, havingValue="true")
@GrpcService(value=JobFileSyncServiceGrpc.class)
public class GRpcJobFileSyncServiceImpl
extends JobFileSyncServiceGrpc.JobFileSyncServiceImplBase {
    private static final Logger log = LoggerFactory.getLogger(GRpcJobFileSyncServiceImpl.class);
    private final JobFileSyncRpcProperties jobFileSyncRpcProperties;
    private final JobFileService jobFileService;
    private final ScheduledFuture<?> ackFuture;
    private final ConcurrentMap<String, JobFileSyncObserver> jobSyncRequestObservers = Maps.newConcurrentMap();

    public GRpcJobFileSyncServiceImpl(JobFileSyncRpcProperties jobFileSyncProperties, JobFileService jobFileService, TaskScheduler taskScheduler) {
        this.jobFileSyncRpcProperties = jobFileSyncProperties;
        this.jobFileService = jobFileService;
        this.ackFuture = taskScheduler.scheduleWithFixedDelay(this::executeObserverAcknowledgements, this.jobFileSyncRpcProperties.getAckIntervalMilliseconds());
    }

    public StreamObserver<SyncRequest> sync(StreamObserver<SyncResponse> responseObserver) {
        return new JobFileSyncObserverImpl(this.jobFileSyncRpcProperties, responseObserver, this.jobFileService, this::addJobFileSyncObserver, this::removeJobFileSyncObserver);
    }

    @PreDestroy
    public void cleanup() {
        if (this.ackFuture != null && !this.ackFuture.isDone()) {
            log.debug("Attempting to cancel the job file sync acknowledgement thread");
            if (!this.ackFuture.cancel(false)) {
                log.error("Unable to cancel the job file sync acknowledgement thread");
            } else {
                log.debug("Cancelled the job file sync acknowledgement thread");
            }
        }
    }

    public void onAgentDetached(String jobId) {
        JobFileSyncObserver observer = (JobFileSyncObserver)this.jobSyncRequestObservers.get(jobId);
        if (observer != null) {
            observer.cleanup();
        }
    }

    private void addJobFileSyncObserver(JobFileSyncObserver observer) {
        this.jobSyncRequestObservers.put(observer.getJobId().orElseThrow(() -> new IllegalArgumentException("Job id not yet set in observer")), observer);
    }

    private void removeJobFileSyncObserver(JobFileSyncObserver observer) {
        observer.getJobId().ifPresent(jobId -> {
            if (this.jobSyncRequestObservers.remove(jobId, observer)) {
                log.debug("Successfully removed observer with id {} for job {}", (Object)observer.getId(), jobId);
            } else {
                log.debug("Failed to remove observer with id {} for job {}", (Object)observer.getId(), jobId);
            }
        });
    }

    private void executeObserverAcknowledgements() {
        log.debug("Invoking job file sync request observers send acknowledgement methods");
        this.jobSyncRequestObservers.values().parallelStream().forEach(JobFileSyncObserver::sendSyncAckMessageIfNecessary);
    }

    private static class JobFileSyncObserverImpl
    implements StreamObserver<SyncRequest>,
    JobFileSyncObserver {
        private static final Logger log = LoggerFactory.getLogger(JobFileSyncObserverImpl.class);
        private final String id = UUID.randomUUID().toString();
        private final Object messagesLock = new Object();
        private final List<SyncRequestResult> requestResults = Lists.newArrayList();
        private final StreamObserver<SyncResponse> responseObserver;
        private final JobFileService jobFileService;
        private final Consumer<JobFileSyncObserver> jobIdPopulatedCallback;
        private final Consumer<JobFileSyncObserver> completionCallback;
        private final AtomicBoolean cleanedUp = new AtomicBoolean(false);
        private final int maxSyncMessages;
        private boolean waitingForBeginMessage = true;
        private boolean sentResetMessage;
        private String jobId;

        private JobFileSyncObserverImpl(JobFileSyncRpcProperties jobFileSyncRpcProperties, StreamObserver<SyncResponse> responseObserver, JobFileService jobFileService, Consumer<JobFileSyncObserver> jobIdPopulatedCallback, Consumer<JobFileSyncObserver> completionCallback) {
            this.responseObserver = responseObserver;
            this.jobFileService = jobFileService;
            this.jobIdPopulatedCallback = jobIdPopulatedCallback;
            this.completionCallback = completionCallback;
            this.maxSyncMessages = jobFileSyncRpcProperties.getMaxSyncMessages();
        }

        public void onNext(SyncRequest request) {
            try {
                if (request.hasBeginSync()) {
                    this.handleBeginSync(request.getBeginSync());
                } else if (request.hasDataUpload()) {
                    this.handleDataUpload(request.getDataUpload());
                } else if (request.hasDeleteFile()) {
                    this.handleDeleteFile(request.getDeleteFile());
                } else if (request.hasSyncComplete()) {
                    this.handleSyncComplete(request.getSyncComplete());
                } else {
                    log.error("Received unknown message type {}", (Object)request);
                }
            }
            catch (IOException e) {
                log.error("Error for upload request {}", (Object)request, (Object)e);
            }
        }

        public void onError(Throwable t) {
            this.cleanup();
        }

        public void onCompleted() {
            this.cleanup();
        }

        @Override
        public Optional<String> getJobId() {
            return Optional.ofNullable(this.jobId);
        }

        @Override
        public void cleanup() {
            if (!this.cleanedUp.getAndSet(true)) {
                log.debug("Cleaning up");
                this.completionCallback.accept(this);
                log.debug("Cleaned up");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void sendSyncAckMessageIfNecessary() {
            Object object = this.messagesLock;
            synchronized (object) {
                if (!this.requestResults.isEmpty()) {
                    log.debug("Sending sync acknowledgment for messages {}", this.requestResults);
                    this.responseObserver.onNext((Object)SyncResponse.newBuilder().setSyncAck(SyncAcknowledgement.newBuilder().addAllResults(this.requestResults).build()).build());
                    this.requestResults.clear();
                }
            }
        }

        private void handleBeginSync(BeginSync beginSync) throws IOException {
            if (this.waitingForBeginMessage) {
                this.jobId = beginSync.getJobId();
                if (StringUtils.isBlank((CharSequence)this.jobId)) {
                    throw new IllegalArgumentException("No job id provided to sync service. Unable to continue");
                }
                log.debug("Beginning to sync job files for job {}", (Object)this.jobId);
                this.jobIdPopulatedCallback.accept(this);
                boolean includeMd5 = false;
                Set<JobFileState> jobFileStates = this.jobFileService.getJobDirectoryFileState(this.jobId, false);
                this.waitingForBeginMessage = false;
                this.responseObserver.onNext((Object)SyncResponse.newBuilder().setBeginAck(BeginAcknowledgement.newBuilder().setServerDirectoryState(JobDirectoryState.newBuilder().setIncludesMd5(false).addAllFiles((Iterable)jobFileStates.stream().map(file -> com.netflix.genie.proto.JobFileState.newBuilder().setPath(file.getPath()).setSize(file.getSize()).build()).collect(Collectors.toSet())).build()).build()).build());
            } else {
                log.warn("Received a {} message after one had already been received", (Object)BeginSync.class.getCanonicalName());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleDataUpload(DataUpload dataUpload) {
            String messageId = dataUpload.getId();
            log.debug("Received data upload message with id {} for job {}", (Object)messageId, (Object)this.jobId);
            if (this.waitingForBeginMessage) {
                log.debug("Haven't received a {} message. Ignoring data upload message {}.", (Object)BeginSync.class.getCanonicalName(), (Object)messageId);
                this.sendResetMessageIfNecessary();
            } else {
                try {
                    this.jobFileService.updateFile(this.jobId, dataUpload.getPath(), dataUpload.getStartByte(), dataUpload.getData().toByteArray());
                    Object object = this.messagesLock;
                    synchronized (object) {
                        this.requestResults.add(this.createRequestResult(messageId, true));
                    }
                }
                catch (Exception e) {
                    log.error("Unable to save data for job {} from message {} due to {}", new Object[]{this.jobId, messageId, e.getMessage(), e});
                    Object object = this.messagesLock;
                    synchronized (object) {
                        this.requestResults.add(this.createRequestResult(messageId, false));
                    }
                }
                this.checkIfShouldSendAck();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleDeleteFile(DeleteFile deleteFile) {
            String messageId = deleteFile.getId();
            log.debug("Received file delete message {} in job {}", (Object)messageId, (Object)this.jobId);
            if (this.waitingForBeginMessage) {
                log.debug("Haven't received a {} message. Ignoring file delete message {}.", (Object)BeginSync.class.getCanonicalName(), (Object)messageId);
                this.sendResetMessageIfNecessary();
            } else {
                try {
                    this.jobFileService.deleteJobFile(this.jobId, deleteFile.getPath());
                    Object object = this.messagesLock;
                    synchronized (object) {
                        this.requestResults.add(this.createRequestResult(messageId, true));
                    }
                }
                catch (Exception e) {
                    log.error("Deleting {} for job {} failed due to {}", new Object[]{deleteFile.getPath(), this.jobId, e.getMessage(), e});
                    Object object = this.messagesLock;
                    synchronized (object) {
                        this.requestResults.add(this.createRequestResult(messageId, false));
                    }
                }
            }
            this.checkIfShouldSendAck();
        }

        private void handleSyncComplete(SyncComplete syncComplete) throws IOException {
            if (this.waitingForBeginMessage) {
                log.debug("Haven't received a {} message. Ignoring file sync complete message {}.", (Object)BeginSync.class.getCanonicalName(), (Object)syncComplete);
                this.sendResetMessageIfNecessary();
            } else {
                log.debug("Job file synchronization from agent for job {} is complete.", (Object)this.jobId);
                this.sendSyncAckMessageIfNecessary();
                this.cleanup();
                JobDirectoryState agentJobDirectoryState = syncComplete.getFinalAgentDirectoryState();
                boolean includeMd5 = agentJobDirectoryState.getIncludesMd5();
                Set agentJobFileStates = agentJobDirectoryState.getFilesList().stream().map(jobFileState -> new JobFileState(jobFileState.getPath(), jobFileState.getSize(), includeMd5 ? jobFileState.getMd5() : null)).collect(Collectors.toSet());
                Set<JobFileState> serverJobFileStates = this.jobFileService.getJobDirectoryFileState(this.jobId, includeMd5);
                if (!agentJobFileStates.equals(serverJobFileStates)) {
                    log.warn("After the agent finished syncing job files for job {} the state of the files on the server {} is different than the supplied state of the files on the agent {}", new Object[]{this.jobId, serverJobFileStates, agentJobFileStates});
                }
            }
        }

        private void sendResetMessageIfNecessary() {
            if (!this.sentResetMessage) {
                log.debug("Sending job file sync reset message to agent");
                this.responseObserver.onNext((Object)SyncResponse.newBuilder().setReset(ResetSync.newBuilder().build()).build());
                this.sentResetMessage = true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void checkIfShouldSendAck() {
            boolean sendAck = false;
            Object object = this.messagesLock;
            synchronized (object) {
                if (this.requestResults.size() == this.maxSyncMessages) {
                    sendAck = true;
                }
            }
            if (sendAck) {
                this.sendSyncAckMessageIfNecessary();
            }
        }

        private SyncRequestResult createRequestResult(String messageId, boolean successful) {
            return SyncRequestResult.newBuilder().setId(messageId).setSuccessful(successful).build();
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof JobFileSyncObserverImpl)) {
                return false;
            }
            JobFileSyncObserverImpl other = (JobFileSyncObserverImpl)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$id = this.id;
            String other$id = other.id;
            return !(this$id == null ? other$id != null : !this$id.equals(other$id));
        }

        protected boolean canEqual(Object other) {
            return other instanceof JobFileSyncObserverImpl;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $id = this.id;
            result = result * 59 + ($id == null ? 43 : $id.hashCode());
            return result;
        }

        @Override
        public String getId() {
            return this.id;
        }
    }

    static interface JobFileSyncObserver {
        public String getId();

        public Optional<String> getJobId();

        public void cleanup();

        public void sendSyncAckMessageIfNecessary();
    }
}

