/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.master.server;

import edu.iu.dsc.tws.api.checkpointing.StateStore;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.driver.DefaultDriver;
import edu.iu.dsc.tws.api.driver.IDriver;
import edu.iu.dsc.tws.api.driver.IDriverMessenger;
import edu.iu.dsc.tws.api.driver.IScaler;
import edu.iu.dsc.tws.api.driver.IScalerPerCluster;
import edu.iu.dsc.tws.api.exceptions.Twister2Exception;
import edu.iu.dsc.tws.api.faulttolerance.JobFaultListener;
import edu.iu.dsc.tws.api.net.StatusCode;
import edu.iu.dsc.tws.api.net.request.ConnectHandler;
import edu.iu.dsc.tws.checkpointing.master.CheckpointManager;
import edu.iu.dsc.tws.checkpointing.util.CheckpointUtils;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingContext;
import edu.iu.dsc.tws.common.net.tcp.Progress;
import edu.iu.dsc.tws.common.net.tcp.request.RRServer;
import edu.iu.dsc.tws.common.util.ReflectionUtils;
import edu.iu.dsc.tws.common.zk.JobZNodeManager;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.common.zk.ZKPersStateManager;
import edu.iu.dsc.tws.common.zk.ZKUtils;
import edu.iu.dsc.tws.master.IJobTerminator;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.barrier.BarrierMonitor;
import edu.iu.dsc.tws.master.barrier.JMBarrierHandler;
import edu.iu.dsc.tws.master.barrier.ZKBarrierHandler;
import edu.iu.dsc.tws.master.dashclient.DashboardClient;
import edu.iu.dsc.tws.master.driver.DriverMessenger;
import edu.iu.dsc.tws.master.driver.Scaler;
import edu.iu.dsc.tws.master.server.JMWorkerHandler;
import edu.iu.dsc.tws.master.server.JobFailureWatcher;
import edu.iu.dsc.tws.master.server.WorkerMonitor;
import edu.iu.dsc.tws.master.server.ZKJobUpdater;
import edu.iu.dsc.tws.master.server.ZKMasterController;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import java.nio.channels.SocketChannel;
import java.util.Objects;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;

public class JobMaster {
    private static final Logger LOG = Logger.getLogger(JobMaster.class.getName());
    public static final int JOB_MASTER_ID = -10;
    private static Progress looper;
    private Config config;
    private String jmAddress;
    private int masterPort;
    private JobAPI.Job job;
    private RRServer rrServer;
    private WorkerMonitor workerMonitor;
    private boolean jobEnded = false;
    private boolean jobMasterFailed = false;
    private JobAPI.JobState finalState;
    private final IJobTerminator jobTerminator;
    private JobMasterAPI.NodeInfo nodeInfo;
    private final IScalerPerCluster clusterScaler;
    private IDriver driver;
    private String dashboardHost;
    private DashboardClient dashClient;
    private JMWorkerHandler workerHandler;
    private BarrierMonitor barrierMonitor;
    private ZKJobUpdater zkJobUpdater;
    private ZKMasterController zkMasterController;
    private ZKBarrierHandler zkBarrierHandler;
    private boolean clearK8sResourcesWhenKilled;
    private CheckpointManager checkpointManager;
    private JobMasterAPI.JobMasterState initialState;
    private static final int MAX_BACK_LOG = 2048;

    public JobMaster(Config config, String jmAddress, int port, IJobTerminator jobTerminator, JobAPI.Job job, JobMasterAPI.NodeInfo nodeInfo, IScalerPerCluster clusterScaler, JobMasterAPI.JobMasterState initialState) {
        this.config = config;
        this.jmAddress = jmAddress;
        this.jobTerminator = Objects.requireNonNull(jobTerminator, "IJobTerminator can not be null. You may use NullTerminator.");
        this.job = job;
        this.nodeInfo = nodeInfo;
        this.masterPort = port;
        this.clusterScaler = Objects.requireNonNull(clusterScaler, "IScalerPerCluster can not be null. You may use NullScaler.");
        this.initialState = initialState;
        this.zkJobUpdater = new ZKJobUpdater(config, job.getJobId());
        this.dashboardHost = JobMasterContext.dashboardHost(config);
        if (this.dashboardHost == null) {
            LOG.warning("Dashboard host address is null. Not connecting to Dashboard");
            this.dashClient = null;
        } else {
            this.dashClient = new DashboardClient(this.dashboardHost, job.getJobId(), JobMasterContext.jmToDashboardConnections(config));
        }
    }

    public JobMaster(Config config, String jmAddress, IJobTerminator jobTerminator, JobAPI.Job job, JobMasterAPI.NodeInfo nodeInfo, IScalerPerCluster clusterScaler, JobMasterAPI.JobMasterState initialState) {
        this(config, jmAddress, JobMasterContext.jobMasterPort(config), jobTerminator, job, nodeInfo, clusterScaler, initialState);
    }

    private void init() throws Twister2Exception {
        boolean registered;
        looper = new Progress();
        if (this.dashClient != null && !(registered = this.dashClient.registerJob(this.job, this.nodeInfo))) {
            LOG.warning("Not using Dashboard since it can not register with it.");
            this.dashClient = null;
        }
        ServerConnectHandler connectHandler = new ServerConnectHandler();
        int backLog = Math.min(this.job.getNumberOfWorkers() / 2, 2048);
        this.rrServer = new RRServer(this.config, this.jmAddress, this.masterPort, looper, -10, (ConnectHandler)connectHandler, backLog);
        this.initDriver();
        JobFailureWatcher jobFailureWatcher = new JobFailureWatcher();
        this.workerMonitor = new WorkerMonitor(this, this.rrServer, this.dashClient, this.zkJobUpdater, this.job, this.driver, jobFailureWatcher);
        this.workerHandler = new JMWorkerHandler(this.workerMonitor, this.rrServer, ZKContext.isZooKeeperServerUsed((Config)this.config));
        if (!ZKContext.isZooKeeperServerUsed((Config)this.config)) {
            this.workerMonitor.setWorkerEventSender(this.workerHandler);
        }
        this.barrierMonitor = new BarrierMonitor(this.workerMonitor, jobFailureWatcher);
        if (ZKContext.isZooKeeperServerUsed((Config)this.config)) {
            this.zkBarrierHandler = new ZKBarrierHandler(this.barrierMonitor, this.config, this.job.getJobId(), this.job.getNumberOfWorkers());
            this.barrierMonitor.setBarrierResponder(this.zkBarrierHandler);
            this.zkBarrierHandler.initialize(this.initialState);
        } else {
            JMBarrierHandler jmBarrierHandler = new JMBarrierHandler(this.rrServer, this.barrierMonitor);
            this.barrierMonitor.setBarrierResponder(jmBarrierHandler);
        }
        jobFailureWatcher.addJobFaultListener(this.barrierMonitor);
        this.initZKMasterController(this.workerMonitor);
        if (CheckpointingContext.isCheckpointingEnabled((Config)this.config)) {
            StateStore stateStore = CheckpointUtils.getStateStore((Config)this.config);
            stateStore.init(this.config, new String[]{"checkpoint-manager"});
            this.checkpointManager = new CheckpointManager(this.rrServer, stateStore, this.job.getJobId());
            jobFailureWatcher.addJobFaultListener((JobFaultListener)this.checkpointManager);
            LOG.info("Checkpoint manager initialized");
            this.checkpointManager.init();
        }
        this.rrServer.start();
        looper.loop();
    }

    public Thread startJobMasterThreaded() throws Twister2Exception {
        this.init();
        this.startDriverThread();
        Thread jmThread = new Thread(){

            @Override
            public void run() {
                JobMaster.this.startLooping();
            }
        };
        jmThread.setName("JobMaster");
        jmThread.setDaemon(true);
        jmThread.start();
        return jmThread;
    }

    public void startJobMasterBlocking() throws Twister2Exception {
        this.init();
        this.startDriverThread();
        this.startLooping();
    }

    private void startLooping() {
        LOG.info("JobMaster [" + this.jmAddress + "] started and waiting worker messages on port: " + this.masterPort);
        while (!this.jobEnded && !this.jobMasterFailed) {
            looper.loopBlocking(300L);
            this.barrierMonitor.checkBarrierFailure();
        }
        if (this.jobMasterFailed) {
            return;
        }
        this.close();
    }

    private void close() {
        this.rrServer.stopGraceFully(2000L);
        if (ZKContext.isZooKeeperServerUsed((Config)this.config)) {
            if (this.jobEnded) {
                JobZNodeManager.createJobEndTimeZNode((CuratorFramework)ZKUtils.getClient(), (String)ZKContext.rootNode((Config)this.config), (String)this.job.getJobId());
                ZKPersStateManager.updateJobMasterStatus((CuratorFramework)ZKUtils.getClient(), (String)ZKContext.rootNode((Config)this.config), (String)this.job.getJobId(), (String)this.jmAddress, (JobMasterAPI.JobMasterState)JobMasterAPI.JobMasterState.JM_COMPLETED);
            } else if (this.jobMasterFailed) {
                ZKPersStateManager.updateJobMasterStatus((CuratorFramework)ZKUtils.getClient(), (String)ZKContext.rootNode((Config)this.config), (String)this.job.getJobId(), (String)this.jmAddress, (JobMasterAPI.JobMasterState)JobMasterAPI.JobMasterState.JM_FAILED);
            }
            this.zkMasterController.close();
            this.zkBarrierHandler.close();
            ZKUtils.closeClient();
        }
        if (this.jobEnded) {
            this.jobTerminator.terminateJob(this.job.getJobId(), this.finalState);
        }
        if (this.dashClient != null) {
            this.dashClient.close();
        }
    }

    private void initDriver() {
        if (JobMasterContext.jobMasterRunsInClient(this.config)) {
            this.driver = new DefaultDriver();
        }
        if (this.job.getDriverClassName().isEmpty()) {
            return;
        }
        String driverClass = this.job.getDriverClassName();
        try {
            Object object = ReflectionUtils.newInstance((String)driverClass);
            this.driver = (IDriver)object;
            LOG.info("loaded driver class: " + driverClass);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.severe(String.format("failed to load the driver class %s", driverClass));
            throw new RuntimeException(e);
        }
    }

    public Thread startDriverThread() {
        if (this.driver == null) {
            return null;
        }
        Thread driverThread = new Thread(){

            @Override
            public void run() {
                Scaler scaler = new Scaler(JobMaster.this.clusterScaler, JobMaster.this.workerMonitor, JobMaster.this.zkJobUpdater);
                DriverMessenger driverMessenger = new DriverMessenger(JobMaster.this.workerMonitor);
                JobMaster.this.driver.execute(JobMaster.this.config, (IScaler)scaler, (IDriverMessenger)driverMessenger);
            }
        };
        driverThread.setName("driver");
        driverThread.start();
        if (this.workerMonitor.isAllJoined()) {
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException e) {
                LOG.warning("Thread sleep interrupted.");
            }
            new Thread("Twister2-AllJoinedSupplierToDriver"){

                @Override
                public void run() {
                    JobMaster.this.workerMonitor.informDriverForAllJoined();
                }
            }.start();
        }
        return driverThread;
    }

    private void initZKMasterController(WorkerMonitor wMonitor) throws Twister2Exception {
        if (ZKContext.isZooKeeperServerUsed((Config)this.config)) {
            this.zkMasterController = new ZKMasterController(this.config, this.job.getJobId(), this.job.getNumberOfWorkers(), this.jmAddress, wMonitor);
            this.workerMonitor.setWorkerEventSender(this.zkMasterController);
            this.zkMasterController.initialize(this.initialState);
        }
    }

    public ZKMasterController getZkMasterController() {
        return this.zkMasterController;
    }

    public JMWorkerHandler getWorkerHandler() {
        return this.workerHandler;
    }

    public void jmFailed() {
        this.jobMasterFailed = true;
        this.close();
    }

    public void endJob(JobAPI.JobState finalState1) {
        this.jobEnded = true;
        this.finalState = finalState1;
        looper.wakeup();
    }

    public void addShutdownHook(boolean clearK8sJobResourcesOnKill) {
        this.clearK8sResourcesWhenKilled = clearK8sJobResourcesOnKill;
        Thread hookThread = new Thread(){

            @Override
            public void run() {
                if (JobMaster.this.jobEnded || JobMaster.this.jobMasterFailed) {
                    return;
                }
                JobMaster.this.finalState = JobAPI.JobState.KILLED;
                if (ZKContext.isZooKeeperServerUsed((Config)JobMaster.this.config)) {
                    JobMaster.this.zkJobUpdater.updateState(JobMaster.this.finalState);
                    JobZNodeManager.createJobEndTimeZNode((CuratorFramework)ZKUtils.getClient(), (String)ZKContext.rootNode((Config)JobMaster.this.config), (String)JobMaster.this.job.getJobId());
                    ZKPersStateManager.updateJobMasterStatus((CuratorFramework)ZKUtils.getClient(), (String)ZKContext.rootNode((Config)JobMaster.this.config), (String)JobMaster.this.job.getJobId(), (String)JobMaster.this.jmAddress, (JobMasterAPI.JobMasterState)JobMasterAPI.JobMasterState.JM_KILLED);
                    JobMaster.this.zkMasterController.close();
                    JobMaster.this.zkBarrierHandler.close();
                    ZKUtils.closeClient();
                }
                if (JobMaster.this.dashClient != null) {
                    JobMaster.this.dashClient.jobStateChange(JobMaster.this.finalState);
                }
                JobMaster.this.jobEnded = true;
                looper.wakeup();
                if (JobMaster.this.clearK8sResourcesWhenKilled) {
                    JobMaster.this.jobTerminator.terminateJob(JobMaster.this.job.getJobId(), JobMaster.this.finalState);
                }
            }
        };
        Runtime.getRuntime().addShutdownHook(hookThread);
    }

    public IDriver getDriver() {
        return this.driver;
    }

    public class ServerConnectHandler
    implements ConnectHandler {
        public void onError(SocketChannel channel, StatusCode status) {
            LOG.warning("Error on channel: " + channel.socket().getRemoteSocketAddress());
        }

        public void onConnect(SocketChannel channel) {
            LOG.fine("Client connected from: " + channel.socket().getRemoteSocketAddress());
        }

        public void onClose(SocketChannel channel) {
            LOG.fine("Client closed: " + channel.socket().getRemoteSocketAddress());
        }
    }
}

