/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.master.server;

import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.checkpointing.StateStore;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.driver.DefaultDriver;
import edu.iu.dsc.tws.api.driver.IDriver;
import edu.iu.dsc.tws.api.driver.IDriverMessenger;
import edu.iu.dsc.tws.api.driver.IScaler;
import edu.iu.dsc.tws.api.driver.IScalerPerCluster;
import edu.iu.dsc.tws.api.exceptions.Twister2Exception;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.api.net.StatusCode;
import edu.iu.dsc.tws.api.net.request.ConnectHandler;
import edu.iu.dsc.tws.api.net.request.MessageHandler;
import edu.iu.dsc.tws.checkpointing.master.CheckpointManager;
import edu.iu.dsc.tws.checkpointing.util.CheckpointUtils;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingConfigurations;
import edu.iu.dsc.tws.common.net.tcp.Progress;
import edu.iu.dsc.tws.common.net.tcp.request.RRServer;
import edu.iu.dsc.tws.common.util.ReflectionUtils;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.common.zk.ZKUtils;
import edu.iu.dsc.tws.master.IJobTerminator;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.dashclient.DashboardClient;
import edu.iu.dsc.tws.master.dashclient.models.JobState;
import edu.iu.dsc.tws.master.driver.DriverMessenger;
import edu.iu.dsc.tws.master.driver.Scaler;
import edu.iu.dsc.tws.master.driver.ZKJobUpdater;
import edu.iu.dsc.tws.master.server.BarrierHandler;
import edu.iu.dsc.tws.master.server.WorkerHandler;
import edu.iu.dsc.tws.master.server.WorkerMonitor;
import edu.iu.dsc.tws.master.server.ZKMasterController;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;

public class JobMaster {
    private static final Logger LOG = Logger.getLogger(JobMaster.class.getName());
    public static final int JOB_MASTER_ID = -10;
    private static Progress looper;
    private Config config;
    private String jmAddress;
    private int masterPort;
    private JobAPI.Job job;
    private RRServer rrServer;
    private WorkerMonitor workerMonitor;
    private boolean jobCompleted = false;
    private IJobTerminator jobTerminator;
    private JobMasterAPI.NodeInfo nodeInfo;
    private IScalerPerCluster clusterScaler;
    private IDriver driver;
    private String dashboardHost;
    private DashboardClient dashClient;
    private WorkerHandler workerHandler;
    private BarrierHandler barrierHandler;
    private ZKMasterController zkMasterController;
    private boolean clearResourcesWhenKilled;
    private CheckpointManager checkpointManager;
    private JobMasterAPI.JobMasterState initialState;

    public JobMaster(Config config, String jmAddress, int port, IJobTerminator jobTerminator, JobAPI.Job job, JobMasterAPI.NodeInfo nodeInfo, IScalerPerCluster clusterScaler, JobMasterAPI.JobMasterState initialState) {
        this.config = config;
        this.jmAddress = jmAddress;
        this.jobTerminator = jobTerminator;
        this.job = job;
        this.nodeInfo = nodeInfo;
        this.masterPort = port;
        this.clusterScaler = clusterScaler;
        this.initialState = initialState;
        this.dashboardHost = JobMasterContext.dashboardHost(config);
        if (this.dashboardHost == null) {
            LOG.warning("Dashboard host address is null. Not connecting to Dashboard");
            this.dashClient = null;
        } else {
            this.dashClient = new DashboardClient(this.dashboardHost, job.getJobId(), JobMasterContext.jmToDashboardConnections(config));
        }
    }

    public JobMaster(Config config, String jmAddress, IJobTerminator jobTerminator, JobAPI.Job job, JobMasterAPI.NodeInfo nodeInfo, IScalerPerCluster clusterScaler, JobMasterAPI.JobMasterState initialState) {
        this(config, jmAddress, JobMasterContext.jobMasterPort(config), jobTerminator, job, nodeInfo, clusterScaler, initialState);
    }

    private void init() throws Twister2Exception {
        boolean registered;
        looper = new Progress();
        if (this.dashClient != null && !(registered = this.dashClient.registerJob(this.job, this.nodeInfo))) {
            LOG.warning("Not using Dashboard since it can not register with it.");
            this.dashClient = null;
        }
        ServerConnectHandler connectHandler = new ServerConnectHandler();
        this.rrServer = new RRServer(this.config, this.jmAddress, this.masterPort, looper, -10, (ConnectHandler)connectHandler);
        this.initDriver();
        boolean faultTolerant = FaultToleranceContext.faultTolerant((Config)this.config);
        this.workerMonitor = new WorkerMonitor(this, this.rrServer, this.dashClient, this.job, this.driver, faultTolerant);
        this.workerHandler = new WorkerHandler(this.workerMonitor, this.rrServer, ZKContext.isZooKeeperServerUsed((Config)this.config));
        this.barrierHandler = new BarrierHandler(this.workerMonitor, this.rrServer);
        JobMasterAPI.RegisterWorker.Builder registerWorkerBuilder = JobMasterAPI.RegisterWorker.newBuilder();
        JobMasterAPI.RegisterWorkerResponse.Builder registerWorkerResponseBuilder = JobMasterAPI.RegisterWorkerResponse.newBuilder();
        JobMasterAPI.WorkerStateChange.Builder stateChangeBuilder = JobMasterAPI.WorkerStateChange.newBuilder();
        JobMasterAPI.WorkerStateChangeResponse.Builder stateChangeResponseBuilder = JobMasterAPI.WorkerStateChangeResponse.newBuilder();
        JobMasterAPI.ListWorkersRequest.Builder listWorkersBuilder = JobMasterAPI.ListWorkersRequest.newBuilder();
        JobMasterAPI.ListWorkersResponse.Builder listResponseBuilder = JobMasterAPI.ListWorkersResponse.newBuilder();
        JobMasterAPI.BarrierRequest.Builder barrierRequestBuilder = JobMasterAPI.BarrierRequest.newBuilder();
        JobMasterAPI.BarrierResponse.Builder barrierResponseBuilder = JobMasterAPI.BarrierResponse.newBuilder();
        JobMasterAPI.WorkersScaled.Builder scaledMessageBuilder = JobMasterAPI.WorkersScaled.newBuilder();
        JobMasterAPI.DriverMessage.Builder driverMessageBuilder = JobMasterAPI.DriverMessage.newBuilder();
        JobMasterAPI.WorkerMessage.Builder workerMessageBuilder = JobMasterAPI.WorkerMessage.newBuilder();
        JobMasterAPI.WorkerMessageResponse.Builder workerResponseBuilder = JobMasterAPI.WorkerMessageResponse.newBuilder();
        JobMasterAPI.WorkersJoined.Builder joinedBuilder = JobMasterAPI.WorkersJoined.newBuilder();
        this.rrServer.registerRequestHandler((Message.Builder)registerWorkerBuilder, (MessageHandler)this.workerHandler);
        this.rrServer.registerRequestHandler((Message.Builder)registerWorkerResponseBuilder, (MessageHandler)this.workerHandler);
        this.rrServer.registerRequestHandler((Message.Builder)stateChangeBuilder, (MessageHandler)this.workerHandler);
        this.rrServer.registerRequestHandler((Message.Builder)stateChangeResponseBuilder, (MessageHandler)this.workerHandler);
        this.rrServer.registerRequestHandler((Message.Builder)listWorkersBuilder, (MessageHandler)this.workerHandler);
        this.rrServer.registerRequestHandler((Message.Builder)listResponseBuilder, (MessageHandler)this.workerHandler);
        this.rrServer.registerRequestHandler((Message.Builder)barrierRequestBuilder, (MessageHandler)this.barrierHandler);
        this.rrServer.registerRequestHandler((Message.Builder)barrierResponseBuilder, (MessageHandler)this.barrierHandler);
        this.rrServer.registerRequestHandler((Message.Builder)scaledMessageBuilder, (MessageHandler)this.workerMonitor);
        this.rrServer.registerRequestHandler((Message.Builder)driverMessageBuilder, (MessageHandler)this.workerMonitor);
        this.rrServer.registerRequestHandler((Message.Builder)workerMessageBuilder, (MessageHandler)this.workerMonitor);
        this.rrServer.registerRequestHandler((Message.Builder)workerResponseBuilder, (MessageHandler)this.workerMonitor);
        this.rrServer.registerRequestHandler((Message.Builder)joinedBuilder, (MessageHandler)this.workerMonitor);
        this.initZKMasterController(this.workerMonitor);
        if (CheckpointingConfigurations.isCheckpointingEnabled((Config)this.config)) {
            StateStore stateStore = CheckpointUtils.getStateStore((Config)this.config);
            stateStore.init(this.config, new String[]{this.job.getJobId()});
            this.checkpointManager = new CheckpointManager(this.rrServer, stateStore, this.job.getJobId());
            LOG.info("Checkpoint manager initialized");
            this.checkpointManager.init();
        }
        this.rrServer.start();
        looper.loop();
    }

    public Thread startJobMasterThreaded() throws Twister2Exception {
        this.init();
        this.startDriverThread();
        Thread jmThread = new Thread(){

            @Override
            public void run() {
                JobMaster.this.startLooping();
            }
        };
        jmThread.setName("JM");
        jmThread.setDaemon(true);
        jmThread.start();
        return jmThread;
    }

    public void startJobMasterBlocking() throws Twister2Exception {
        this.init();
        this.startDriverThread();
        this.startLooping();
    }

    private void startLooping() {
        LOG.info("JobMaster [" + this.jmAddress + "] started and waiting worker messages on port: " + this.masterPort);
        while (!this.jobCompleted) {
            looper.loopBlocking();
        }
        this.rrServer.stopGraceFully(2000L);
        if (this.zkMasterController != null) {
            this.zkMasterController.close();
            this.deleteZKNodes();
        }
        if (this.jobTerminator != null) {
            this.jobTerminator.terminateJob(this.job.getJobId());
        }
        if (this.dashClient != null) {
            this.dashClient.close();
        }
    }

    private void initDriver() {
        if (JobMasterContext.jobMasterRunsInClient(this.config)) {
            this.driver = new DefaultDriver();
        }
        if (this.job.getDriverClassName().isEmpty()) {
            return;
        }
        String driverClass = this.job.getDriverClassName();
        try {
            Object object = ReflectionUtils.newInstance((String)driverClass);
            this.driver = (IDriver)object;
            LOG.info("loaded driver class: " + driverClass);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.severe(String.format("failed to load the driver class %s", driverClass));
            throw new RuntimeException(e);
        }
    }

    public Thread startDriverThread() {
        if (this.driver == null) {
            return null;
        }
        Thread driverThread = new Thread(){

            @Override
            public void run() {
                ZKJobUpdater zkJobUpdater = new ZKJobUpdater(JobMaster.this.config);
                Scaler scaler = new Scaler(JobMaster.this.job, JobMaster.this.clusterScaler, JobMaster.this.workerMonitor, zkJobUpdater);
                DriverMessenger driverMessenger = new DriverMessenger(JobMaster.this.workerMonitor);
                JobMaster.this.driver.execute(JobMaster.this.config, (IScaler)scaler, (IDriverMessenger)driverMessenger);
            }
        };
        driverThread.setName("driver");
        driverThread.start();
        if (this.workerMonitor.isAllJoined()) {
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException e) {
                LOG.warning("Thread sleep interrupted.");
            }
            Executors.newSingleThreadExecutor().execute(new Runnable(){

                @Override
                public void run() {
                    JobMaster.this.workerMonitor.informDriverForAllJoined();
                }
            });
        }
        return driverThread;
    }

    private void initZKMasterController(WorkerMonitor wMonitor) throws Twister2Exception {
        if (ZKContext.isZooKeeperServerUsed((Config)this.config)) {
            this.zkMasterController = new ZKMasterController(this.config, this.job.getJobId(), this.job.getNumberOfWorkers(), this.jmAddress, this.workerMonitor);
            this.zkMasterController.initialize(this.initialState);
        }
    }

    public ZKMasterController getZkMasterController() {
        return this.zkMasterController;
    }

    public WorkerHandler getWorkerHandler() {
        return this.workerHandler;
    }

    public void completeJob(JobState jobFinalState) {
        if (this.dashClient != null) {
            this.dashClient.jobStateChange(jobFinalState);
        }
        this.jobCompleted = true;
        looper.wakeup();
    }

    public void addShutdownHook(boolean clearJobResourcesOnKill) {
        this.clearResourcesWhenKilled = clearJobResourcesOnKill;
        Thread hookThread = new Thread(){

            @Override
            public void run() {
                if (JobMaster.this.jobCompleted) {
                    return;
                }
                if (JobMaster.this.dashClient != null) {
                    JobMaster.this.dashClient.jobStateChange(JobState.KILLED);
                }
                if (JobMaster.this.zkMasterController != null) {
                    JobMaster.this.zkMasterController.close();
                    JobMaster.this.deleteZKNodes();
                }
                if (JobMaster.this.clearResourcesWhenKilled) {
                    JobMaster.this.jobCompleted = true;
                    looper.wakeup();
                    if (JobMaster.this.jobTerminator != null) {
                        JobMaster.this.jobTerminator.terminateJob(JobMaster.this.job.getJobId());
                    }
                }
            }
        };
        Runtime.getRuntime().addShutdownHook(hookThread);
    }

    private boolean deleteZKNodes() {
        boolean zkCleared = true;
        if (ZKContext.isZooKeeperServerUsed((Config)this.config)) {
            CuratorFramework client = ZKUtils.connectToServer((String)ZKContext.serverAddresses((Config)this.config));
            String rootPath = ZKContext.rootNode((Config)this.config);
            zkCleared = ZKUtils.deleteJobZNodes((CuratorFramework)client, (String)rootPath, (String)this.job.getJobId());
            ZKUtils.closeClient();
        }
        return zkCleared;
    }

    public IDriver getDriver() {
        return this.driver;
    }

    public class ServerConnectHandler
    implements ConnectHandler {
        public void onError(SocketChannel channel) {
        }

        public void onConnect(SocketChannel channel, StatusCode status) {
            try {
                LOG.fine("Client connected from:" + channel.getRemoteAddress());
            }
            catch (IOException e) {
                LOG.log(Level.SEVERE, "Exception when getting RemoteAddress", e);
            }
        }

        public void onClose(SocketChannel channel) {
        }
    }
}

