/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.k8s.logger;

import com.google.gson.reflect.TypeToken;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesController;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.logger.WorkerLogger;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sWorkerUtils;
import edu.iu.dsc.tws.rsched.utils.FileUtils;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.Call;

public class JobLogger
extends Thread {
    private static final Logger LOG = Logger.getLogger(JobLogger.class.getName());
    private JobAPI.Job job;
    private String namespace;
    private String logsDir;
    private CoreV1Api v1Api;
    private Watch<V1Pod> watcher;
    private boolean stopLogger = false;
    private List<WorkerLogger> loggers;
    private Set<String> completedLoggers;
    private int numberOfWorkers;

    public JobLogger(String namespace, JobAPI.Job job) {
        this.namespace = namespace;
        this.job = job;
        this.numberOfWorkers = job.getNumberOfWorkers();
        this.loggers = new LinkedList<WorkerLogger>();
        this.completedLoggers = new ConcurrentSkipListSet<String>();
    }

    @Override
    public void run() {
        this.v1Api = KubernetesController.createCoreV1Api();
        this.logsDir = System.getProperty("user.home") + "/.twister2/" + this.job.getJobId();
        if (!FileUtils.isDirectoryExists(this.logsDir)) {
            FileUtils.createDirectory(this.logsDir);
        }
        LOG.info("Job logs directory: " + this.logsDir);
        this.watchPodsToRunningStartLoggers();
    }

    public synchronized void workerLoggerCompleted(String loggerID) {
        this.completedLoggers.add(loggerID);
        if (this.completedLoggers.size() == this.numberOfWorkers + 1) {
            this.stopLogger();
            LOG.info("All workers completed. Job has finished.");
        }
    }

    private void watchPodsToRunningStartLoggers() {
        String jobPodsLabel = KubernetesUtils.createJobPodsLabelWithKey(this.job.getJobId());
        Integer timeoutSeconds = Integer.MAX_VALUE;
        String podPhase = "Running";
        try {
            this.watcher = Watch.createWatch((ApiClient)KubernetesController.getApiClient(), (Call)this.v1Api.listNamespacedPodCall(this.namespace, null, null, null, null, jobPodsLabel, null, null, timeoutSeconds, Boolean.TRUE, null), (Type)new TypeToken<Watch.Response<V1Pod>>(){}.getType());
        }
        catch (ApiException e) {
            String logMessage = "Exception when watching the pods to get the IPs: \nexCode: " + e.getCode() + "\nresponseBody: " + e.getResponseBody();
            LOG.log(Level.SEVERE, logMessage, e);
            throw new RuntimeException(e);
        }
        try {
            for (Watch.Response item : this.watcher) {
                if (!this.stopLogger) {
                    if (item.object == null || !((V1Pod)item.object).getMetadata().getName().startsWith(this.job.getJobId()) || !podPhase.equals(((V1Pod)item.object).getStatus().getPhase()) || ((V1Pod)item.object).getMetadata().getDeletionTimestamp() != null) continue;
                    String podName = ((V1Pod)item.object).getMetadata().getName();
                    String podIP = ((V1Pod)item.object).getStatus().getPodIP();
                    List containers = ((V1Pod)item.object).getSpec().getContainers();
                    if (podName.endsWith("-jm-0")) {
                        String contName = ((V1Container)containers.get(0)).getName();
                        String id = "job-master-ip" + podIP;
                        WorkerLogger workerLogger = new WorkerLogger(this.namespace, podName, contName, id, this.logsDir, this.v1Api, this);
                        this.startWorkerLogger(workerLogger);
                        continue;
                    }
                    for (V1Container container : containers) {
                        int wID = K8sWorkerUtils.calculateWorkerID(this.job, podName, container.getName());
                        if (wID >= this.numberOfWorkers) {
                            this.numberOfWorkers = wID + 1;
                        }
                        String id = "worker" + wID + "-ip" + podIP;
                        WorkerLogger workerLogger = new WorkerLogger(this.namespace, podName, container.getName(), id, this.logsDir, this.v1Api, this);
                        this.startWorkerLogger(workerLogger);
                    }
                    continue;
                }
                break;
            }
        }
        catch (RuntimeException e) {
            if (this.stopLogger) {
                LOG.fine("JobLogger is stopped.");
                return;
            }
            throw e;
        }
        this.closeWatcher();
    }

    private void startWorkerLogger(WorkerLogger workerLogger) {
        WorkerLogger existingLogger;
        if (this.loggers.contains(workerLogger) && (existingLogger = this.loggers.get(this.loggers.indexOf(workerLogger))).isAlive()) {
            LOG.info("Ignoring " + workerLogger.getID() + " start event for logging, since a logger for that worker is already running.");
            return;
        }
        workerLogger.start();
        this.completedLoggers.removeIf(loggerID -> loggerID.equals(workerLogger.getID()));
        this.loggers.add(workerLogger);
    }

    private void closeWatcher() {
        if (this.watcher == null) {
            return;
        }
        try {
            this.watcher.close();
        }
        catch (IOException e) {
            LOG.log(Level.SEVERE, "Exception closing watcher.", e);
        }
        this.watcher = null;
    }

    public void stopLogger() {
        this.stopLogger = true;
        this.closeWatcher();
        for (WorkerLogger logger : this.loggers) {
            if (!logger.isAlive()) continue;
            logger.stopLogging();
        }
    }
}

