/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.aurora;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.Context;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.scheduler.SchedulerContext;
import edu.iu.dsc.tws.common.config.ConfigLoader;
import edu.iu.dsc.tws.common.util.ReflectionUtils;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.NodeInfoUtils;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.bootstrap.ZKWorkerController;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Paths;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class AuroraWorkerStarter {
    public static final Logger LOG = Logger.getLogger(AuroraWorkerStarter.class.getName());
    private InetAddress workerAddress;
    private int workerPort;
    private String mesosTaskID;
    private Config config;
    private JobAPI.Job job;
    private ZKWorkerController zkWorkerController;

    private AuroraWorkerStarter() {
    }

    public static void main(String[] args) {
        IWorker worker;
        AuroraWorkerStarter workerStarter = AuroraWorkerStarter.createAuroraWorker();
        workerStarter.waitAndGetAllWorkers();
        String workerClass = SchedulerContext.workerClass((Config)workerStarter.config);
        try {
            Object object = ReflectionUtils.newInstance((String)workerClass);
            worker = (IWorker)object;
            LOG.info("loaded worker class: " + workerClass);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.log(Level.SEVERE, String.format("failed to load the worker class %s", workerClass), e);
            throw new RuntimeException(e);
        }
        worker.execute(workerStarter.config, workerStarter.zkWorkerController.getWorkerInfo().getWorkerID(), null, null, null);
        workerStarter.close();
    }

    public static AuroraWorkerStarter createAuroraWorker() {
        AuroraWorkerStarter workerStarter = new AuroraWorkerStarter();
        String hostname = System.getProperty("hostname");
        String portStr = System.getProperty("tcpPort");
        workerStarter.mesosTaskID = System.getProperty("taskID");
        try {
            workerStarter.workerAddress = InetAddress.getByName(hostname);
            workerStarter.workerPort = Integer.parseInt(portStr);
            LOG.log(Level.INFO, "worker IP: " + hostname + " workerPort: " + portStr);
            LOG.log(Level.INFO, "worker mesosTaskID: " + workerStarter.mesosTaskID);
        }
        catch (UnknownHostException e) {
            LOG.log(Level.SEVERE, "worker ip address is not valid: " + hostname, e);
            throw new RuntimeException(e);
        }
        workerStarter.readJobDescFile();
        AuroraWorkerStarter.logJobInfo(workerStarter.job);
        workerStarter.loadConfig();
        LOG.fine("Config from files: \n" + workerStarter.config.toString());
        workerStarter.overrideConfigsFromJob();
        workerStarter.initializeWithZooKeeper();
        return workerStarter;
    }

    private void readJobDescFile() {
        String jobDescFile = System.getProperty("job_desc_file");
        jobDescFile = "twister2-job/" + jobDescFile;
        this.job = JobUtils.readJobFile(null, jobDescFile);
        LOG.log(Level.INFO, "Job description file is read: " + jobDescFile);
    }

    public void loadConfig() {
        String twister2Home = Paths.get("", new String[0]).toAbsolutePath().toString();
        String clusterType = System.getProperty("cluster_type");
        String configDir = twister2Home + "/" + "twister2-job" + "/" + clusterType;
        LOG.log(Level.INFO, String.format("Loading configuration with twister2_home: %s and configuration: %s", twister2Home, configDir));
        Config conf = ConfigLoader.loadConfig((String)twister2Home, (String)"twister2-job", (String)clusterType);
        this.config = Config.newBuilder().putAll(conf).put(Context.TWISTER2_HOME.getKey(), (Object)twister2Home).put(Context.TWISTER2_CONF.getKey(), (Object)configDir).put("twister2.cluster.type", (Object)clusterType).build();
        LOG.log(Level.INFO, "Config files are read from directory: " + configDir);
    }

    public void overrideConfigsFromJob() {
        Config.Builder builder = Config.newBuilder().putAll(this.config);
        JobAPI.Config conf = this.job.getConfig();
        LOG.log(Level.INFO, "Number of configs to override from job conf: " + conf.getKvsCount());
        for (JobAPI.Config.KeyValue kv : conf.getKvsList()) {
            builder.put(kv.getKey(), (Object)kv.getValue());
            LOG.log(Level.INFO, "Overriden conf key-value pair: " + kv.getKey() + ": " + kv.getValue());
        }
        this.config = builder.build();
    }

    public void initializeWithZooKeeper() {
        long startTime = System.currentTimeMillis();
        String workerHostPort = this.workerAddress.getHostAddress() + ":" + this.workerPort;
        int numberOfWorkers = this.job.getNumberOfWorkers();
        JobMasterAPI.NodeInfo nodeInfo = NodeInfoUtils.createNodeInfo(null, null, null);
        this.zkWorkerController = new ZKWorkerController(this.config, this.job.getJobName(), workerHostPort, numberOfWorkers, nodeInfo, null);
        this.zkWorkerController.initialize();
        long duration = System.currentTimeMillis() - startTime;
        LOG.info("Initialization for the worker: " + this.zkWorkerController.getWorkerInfo() + " took: " + duration + "ms");
    }

    public void waitAndGetAllWorkers() {
        int numberOfWorkers = this.job.getNumberOfWorkers();
        LOG.info("Waiting for " + numberOfWorkers + " workers to join .........");
        long startTime = System.currentTimeMillis();
        List<JobMasterAPI.WorkerInfo> workerList = null;
        try {
            workerList = this.zkWorkerController.getAllWorkers();
        }
        catch (TimeoutException timeoutException) {
            LOG.log(Level.SEVERE, timeoutException.getMessage(), timeoutException);
            return;
        }
        long duration = System.currentTimeMillis() - startTime;
        if (workerList == null) {
            LOG.log(Level.SEVERE, "Could not get full worker list. timeout limit has been reached !!!!");
        } else {
            LOG.log(Level.INFO, "Waited " + duration + " ms for all workers to join.");
            LOG.info("list of all joined workers in the job: " + WorkerInfoUtils.workerListAsString(workerList));
        }
    }

    public void close() {
        this.zkWorkerController.close();
    }

    public static void logJobInfo(JobAPI.Job job) {
        StringBuffer sb = new StringBuffer("Job Details:");
        sb.append("\nJob name: " + job.getJobName());
        sb.append("\nJob file: " + job.getJobFormat().getJobFile());
        sb.append("\nnumber of workers: " + job.getNumberOfWorkers());
        sb.append("\nCPUs: " + job.getComputeResource(0).getCpu());
        sb.append("\nRAM: " + job.getComputeResource(0).getRamMegaBytes());
        sb.append("\nDisk: " + job.getComputeResource(0).getDiskGigaBytes());
        JobAPI.Config conf = job.getConfig();
        sb.append("\nnumber of key-values in job conf: " + conf.getKvsCount());
        for (JobAPI.Config.KeyValue kv : conf.getKvsList()) {
            sb.append("\n" + kv.getKey() + ": " + kv.getValue());
        }
        LOG.info(sb.toString());
    }
}

