/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.master.server;

import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.net.request.MessageHandler;
import edu.iu.dsc.tws.api.net.request.RequestID;
import edu.iu.dsc.tws.common.net.tcp.request.RRServer;
import edu.iu.dsc.tws.common.zk.WorkerWithState;
import edu.iu.dsc.tws.master.server.WorkerMonitor;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

public class WorkerHandler
implements MessageHandler {
    private static final Logger LOG = Logger.getLogger(WorkerHandler.class.getName());
    private WorkerMonitor workerMonitor;
    private RRServer rrServer;
    private boolean zkUsed;
    private boolean allConnected = false;
    private List<RequestID> waitList;

    public WorkerHandler(WorkerMonitor workerMonitor, RRServer rrServer, boolean zkUsed) {
        this.workerMonitor = workerMonitor;
        this.rrServer = rrServer;
        this.zkUsed = zkUsed;
        this.waitList = new LinkedList<RequestID>();
    }

    public boolean isAllConnected() {
        return this.allConnected;
    }

    public void onMessage(RequestID id, int workerId, Message message) {
        if (message instanceof JobMasterAPI.RegisterWorker) {
            JobMasterAPI.RegisterWorker rwMessage = (JobMasterAPI.RegisterWorker)message;
            this.registerWorkerMessageReceived(id, rwMessage);
        } else if (message instanceof JobMasterAPI.WorkerStateChange) {
            JobMasterAPI.WorkerStateChange scMessage = (JobMasterAPI.WorkerStateChange)message;
            this.stateChangeMessageReceived(id, scMessage);
        } else if (message instanceof JobMasterAPI.ListWorkersRequest) {
            LOG.log(Level.FINE, "ListWorkersRequest received: " + message.toString());
            JobMasterAPI.ListWorkersRequest listMessage = (JobMasterAPI.ListWorkersRequest)message;
            this.listWorkersMessageReceived(id, listMessage);
        } else {
            LOG.log(Level.SEVERE, "Un-known message type received: " + message);
        }
    }

    private void registerWorkerMessageReceived(RequestID id, JobMasterAPI.RegisterWorker message) {
        this.handleAllConnected();
        if (this.zkUsed) {
            int wID = message.getWorkerInfo().getWorkerID();
            LOG.fine("Since ZooKeeper is used, ignoring RegisterWorker message for worker: " + wID);
            this.sendRegisterWorkerResponse(id, wID, true, null);
            this.workerMonitor.informDriverForAllJoined();
            return;
        }
        LOG.fine("RegisterWorker message received: \n" + message);
        JobMasterAPI.WorkerInfo workerInfo = message.getWorkerInfo();
        boolean initialAllJoined = this.workerMonitor.isAllJoined();
        WorkerWithState workerWithState = new WorkerWithState(workerInfo, message.getInitialState());
        if (message.getInitialState() == JobMasterAPI.WorkerState.RESTARTED) {
            String failMessage = this.workerMonitor.restarted(workerWithState);
            if (failMessage != null) {
                this.sendRegisterWorkerResponse(id, workerInfo.getWorkerID(), false, failMessage);
                return;
            }
        } else {
            String failMessage = this.workerMonitor.started(workerWithState);
            if (failMessage != null) {
                this.sendRegisterWorkerResponse(id, workerInfo.getWorkerID(), false, failMessage);
                return;
            }
        }
        this.sendRegisterWorkerResponse(id, workerInfo.getWorkerID(), true, null);
        if (!initialAllJoined && this.workerMonitor.isAllJoined()) {
            LOG.info("All workers joined the job. Worker IDs: " + this.workerMonitor.getWorkerIDs());
            this.sendListWorkersResponseToWaitList();
            this.sendWorkersJoinedMessage();
        }
    }

    private void stateChangeMessageReceived(RequestID id, JobMasterAPI.WorkerStateChange message) {
        if (!this.workerMonitor.existWorker(message.getWorkerID())) {
            LOG.warning("WorkerStateChange message received from a worker that has not joined the job yet.\nNot processing the message, just sending a response" + message);
            this.sendWorkerStateChangeResponse(id, message.getWorkerID(), message.getState());
            return;
        }
        if (message.getState() == JobMasterAPI.WorkerState.COMPLETED) {
            this.sendWorkerStateChangeResponse(id, message.getWorkerID(), message.getState());
            this.workerMonitor.completed(message.getWorkerID());
            return;
        }
        if (message.getState() == JobMasterAPI.WorkerState.FAILED) {
            LOG.warning("Worker [" + message.getWorkerID() + "] Failed. ");
            this.sendWorkerStateChangeResponse(id, message.getWorkerID(), message.getState());
            this.workerMonitor.failed(message.getWorkerID());
            return;
        }
        LOG.warning("Unrecognized WorkerStateChange message received. Ignoring and sending reply: \n" + message);
        this.sendWorkerStateChangeResponse(id, message.getWorkerID(), message.getState());
    }

    private void listWorkersMessageReceived(RequestID id, JobMasterAPI.ListWorkersRequest listMessage) {
        if (listMessage.getRequestType() == JobMasterAPI.ListWorkersRequest.RequestType.IMMEDIATE_RESPONSE) {
            this.sendListWorkersResponse(id);
            LOG.fine(String.format("Expecting %d workers, %d joined", this.workerMonitor.getNumberOfWorkers(), this.workerMonitor.getWorkersListSize()));
        } else if (listMessage.getRequestType() == JobMasterAPI.ListWorkersRequest.RequestType.RESPONSE_AFTER_ALL_JOINED) {
            if (this.workerMonitor.getWorkersListSize() == this.workerMonitor.getNumberOfWorkers()) {
                this.sendListWorkersResponse(id);
            } else {
                this.waitList.add(id);
            }
            LOG.log(Level.FINE, String.format("Expecting %d workers, %d joined", this.workerMonitor.getNumberOfWorkers(), this.workerMonitor.getWorkersListSize()));
        }
    }

    public void workersScaledDown(int instancesRemoved) {
        int change = 0 - instancesRemoved;
        JobMasterAPI.WorkersScaled scaledMessage = JobMasterAPI.WorkersScaled.newBuilder().setChange(change).setNumberOfWorkers(this.workerMonitor.getNumberOfWorkers()).build();
        for (int workerID : this.workerMonitor.getWorkerIDs()) {
            this.rrServer.sendMessage((Message)scaledMessage, workerID);
        }
    }

    public void workersScaledUp(int instancesAdded) {
        this.unsetAllConnected();
        if (this.zkUsed) {
            return;
        }
        JobMasterAPI.WorkersScaled scaledMessage = JobMasterAPI.WorkersScaled.newBuilder().setChange(instancesAdded).setNumberOfWorkers(this.workerMonitor.getNumberOfWorkers()).build();
        int numberOfWorkersBeforeScaling = this.workerMonitor.getNumberOfWorkers() - instancesAdded;
        for (int wID = 0; wID < numberOfWorkersBeforeScaling; ++wID) {
            this.rrServer.sendMessage((Message)scaledMessage, wID);
        }
    }

    private void sendListWorkersResponse(RequestID requestID) {
        List<JobMasterAPI.WorkerInfo> workerList = this.workerMonitor.getWorkerInfoList();
        JobMasterAPI.ListWorkersResponse response = JobMasterAPI.ListWorkersResponse.newBuilder().setNumberOfWorkers(workerList.size()).addAllWorker(workerList).build();
        this.rrServer.sendResponse(requestID, (Message)response);
        LOG.fine("ListWorkersResponse sent:\n" + response);
    }

    private void sendListWorkersResponseToWaitList() {
        List<JobMasterAPI.WorkerInfo> workerList = this.workerMonitor.getWorkerInfoList();
        JobMasterAPI.ListWorkersResponse response = JobMasterAPI.ListWorkersResponse.newBuilder().setNumberOfWorkers(workerList.size()).addAllWorker(workerList).build();
        for (RequestID requestID : this.waitList) {
            this.rrServer.sendResponse(requestID, (Message)response);
        }
        this.waitList.clear();
    }

    private void sendRegisterWorkerResponse(RequestID id, int workerID, boolean result, String reason) {
        JobMasterAPI.RegisterWorkerResponse response = JobMasterAPI.RegisterWorkerResponse.newBuilder().setWorkerID(workerID).setResult(result).setReason(reason == null ? "" : reason).build();
        this.rrServer.sendResponse(id, (Message)response);
        LOG.fine("RegisterWorkerResponse sent:\n" + response);
    }

    private void sendWorkerStateChangeResponse(RequestID id, int workerID, JobMasterAPI.WorkerState sentState) {
        JobMasterAPI.WorkerStateChangeResponse response = JobMasterAPI.WorkerStateChangeResponse.newBuilder().setWorkerID(workerID).setState(sentState).build();
        this.rrServer.sendResponse(id, (Message)response);
        LOG.fine("WorkerStateChangeResponse sent:\n" + response);
    }

    public void sendWorkersJoinedMessage() {
        LOG.info("Sending WorkersJoined messages ...");
        List<JobMasterAPI.WorkerInfo> workerInfoList = this.workerMonitor.getWorkerInfoList();
        JobMasterAPI.WorkersJoined joinedMessage = JobMasterAPI.WorkersJoined.newBuilder().setNumberOfWorkers(workerInfoList.size()).addAllWorker(workerInfoList).build();
        for (JobMasterAPI.WorkerInfo workerInfo : workerInfoList) {
            this.rrServer.sendMessage((Message)joinedMessage, workerInfo.getWorkerID());
        }
    }

    private boolean allWorkersConnected() {
        int numberOfWorkers = this.workerMonitor.getNumberOfWorkers();
        Set connectedWorkers = this.rrServer.getConnectedWorkers();
        return connectedWorkers.size() == numberOfWorkers && (Integer)Collections.max(connectedWorkers) == numberOfWorkers - 1;
    }

    private void handleAllConnected() {
        if (!this.allConnected && this.allWorkersConnected()) {
            this.allConnected = true;
        }
    }

    public void unsetAllConnected() {
        if (this.allWorkersConnected()) {
            return;
        }
        this.allConnected = false;
    }
}

