/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.yarn;

import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Uninterruptibles;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceController;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.internal.AbstractTwillController;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.internal.yarn.YarnAppClient;
import org.apache.twill.internal.yarn.YarnApplicationReport;
import org.apache.twill.yarn.ApplicationMasterLiveNodeDecoder;
import org.apache.twill.yarn.ResourceReportClient;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class YarnTwillController
extends AbstractTwillController
implements TwillController {
    private static final Logger LOG = LoggerFactory.getLogger(YarnTwillController.class);
    private final String appName;
    private final Callable<ProcessController<YarnApplicationReport>> startUp;
    private final long startTimeout;
    private final TimeUnit startTimeoutUnit;
    private volatile ApplicationMasterLiveNodeData amLiveNodeData;
    private ProcessController<YarnApplicationReport> processController;
    private ApplicationAttemptId currentAttemptId;
    private Thread statusPollingThread;

    YarnTwillController(String appName, RunId runId, ZKClient zkClient, ApplicationMasterLiveNodeData amLiveNodeData, YarnAppClient yarnAppClient) {
        this(appName, runId, zkClient, amLiveNodeData.getKafkaZKConnect() != null, Collections.emptyList(), () -> yarnAppClient.createProcessController(ApplicationId.newInstance((long)amLiveNodeData.getAppIdClusterTime(), (int)amLiveNodeData.getAppId())), 60L, TimeUnit.SECONDS);
    }

    YarnTwillController(String appName, RunId runId, ZKClient zkClient, boolean logCollectionEnabled, Iterable<LogHandler> logHandlers, Callable<ProcessController<YarnApplicationReport>> startUp, long startTimeout, TimeUnit startTimeoutUnit) {
        super(appName, runId, zkClient, logCollectionEnabled, logHandlers);
        this.appName = appName;
        this.startUp = startUp;
        this.startTimeout = startTimeout;
        this.startTimeoutUnit = startTimeoutUnit;
    }

    ListenableFuture<Void> secureStoreUpdated() {
        return this.sendMessage(SystemMessages.SECURE_STORE_UPDATED, null);
    }

    @Nullable
    ApplicationMasterLiveNodeData getApplicationMasterLiveNodeData() {
        return this.amLiveNodeData;
    }

    protected void doStartUp() {
        super.doStartUp();
        try {
            this.processController = this.startUp.call();
            YarnApplicationReport report = (YarnApplicationReport)this.processController.getReport();
            ApplicationId appId = report.getApplicationId();
            LOG.info("Application {} with id {} submitted", (Object)this.appName, (Object)appId);
            YarnApplicationState state = report.getYarnApplicationState();
            Stopwatch stopWatch = new Stopwatch().start();
            LOG.debug("Checking yarn application status for {} {}", (Object)this.appName, (Object)appId);
            while (!this.hasRun(state) && stopWatch.elapsedTime(this.startTimeoutUnit) < this.startTimeout) {
                report = (YarnApplicationReport)this.processController.getReport();
                state = report.getYarnApplicationState();
                LOG.debug("Yarn application status for {} {}: {}", new Object[]{this.appName, appId, state});
                TimeUnit.SECONDS.sleep(1L);
            }
            LOG.info("Yarn application {} {} is in state {}", new Object[]{this.appName, appId, state});
            if (state != YarnApplicationState.RUNNING) {
                LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", (Object)this.appName, (Object)appId);
                this.forceShutDown();
            }
            this.currentAttemptId = report.getCurrentApplicationAttemptId();
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    protected synchronized void doShutDown() {
        FinalApplicationStatus finalStatus;
        if (this.processController == null) {
            LOG.warn("No process controller for application that is not submitted.");
            return;
        }
        this.stopPollStatus();
        long timeoutMillis = this.getTerminationTimeoutMillis(60L, TimeUnit.SECONDS);
        try {
            Uninterruptibles.getUninterruptibly((Future)this.getStopMessageFuture(), (long)timeoutMillis, (TimeUnit)TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            LOG.error("Failed to wait for stop message being processed.", (Throwable)e);
            this.kill();
        }
        try (ProcessController<YarnApplicationReport> processController = this.processController;){
            Stopwatch stopWatch = new Stopwatch().start();
            YarnApplicationReport report = (YarnApplicationReport)processController.getReport();
            finalStatus = report.getFinalApplicationStatus();
            ApplicationId appId = report.getApplicationId();
            while (finalStatus == FinalApplicationStatus.UNDEFINED && stopWatch.elapsedTime(TimeUnit.MILLISECONDS) < timeoutMillis) {
                LOG.debug("Yarn application final status for {} {}: {}", new Object[]{this.appName, appId, finalStatus});
                TimeUnit.SECONDS.sleep(1L);
                finalStatus = ((YarnApplicationReport)processController.getReport()).getFinalApplicationStatus();
            }
            if (finalStatus == FinalApplicationStatus.UNDEFINED) {
                this.kill();
                finalStatus = FinalApplicationStatus.KILLED;
            }
        }
        catch (Exception e) {
            LOG.warn("Exception while waiting for application report: {}", (Object)e.getMessage(), (Object)e);
            this.kill();
            finalStatus = FinalApplicationStatus.KILLED;
        }
        super.doShutDown();
        if (finalStatus == FinalApplicationStatus.FAILED) {
            this.setTerminationStatus(ServiceController.TerminationStatus.FAILED);
            throw new RuntimeException(String.format("Yarn application completed with failure %s, %s.", this.appName, this.getRunId()));
        }
        this.setTerminationStatus(finalStatus == FinalApplicationStatus.SUCCEEDED ? ServiceController.TerminationStatus.SUCCEEDED : ServiceController.TerminationStatus.KILLED);
    }

    public void kill() {
        if (this.processController != null) {
            YarnApplicationReport report = (YarnApplicationReport)this.processController.getReport();
            LOG.info("Killing application {} {}", (Object)this.appName, (Object)report.getApplicationId());
            this.processController.cancel();
        } else {
            LOG.warn("No process controller for application that is not submitted.");
        }
    }

    protected void instanceNodeUpdated(NodeData nodeData) {
        ApplicationMasterLiveNodeData data = ApplicationMasterLiveNodeDecoder.decode(nodeData);
        if (data != null) {
            this.amLiveNodeData = data;
        }
    }

    protected void instanceNodeFailed(Throwable cause) {
        if (this.processController == null) {
            LOG.warn("No process controller for application that is not submitted.");
            return;
        }
        YarnApplicationReport report = (YarnApplicationReport)this.processController.getReport();
        LOG.info("Failed to access application {} {} live node in ZK, resort to polling. Failure reason: {}", new Object[]{this.appName, report.getApplicationId(), cause == null ? "Unknown" : cause.getMessage()});
        this.startPollStatus(report.getApplicationId());
    }

    private synchronized void startPollStatus(ApplicationId appId) {
        if (this.statusPollingThread == null) {
            this.statusPollingThread = new Thread(this.createStatusPollingRunnable(), String.format("%s-%s-yarn-poller", this.appName, appId));
            this.statusPollingThread.setDaemon(true);
            this.statusPollingThread.start();
        }
    }

    private synchronized void stopPollStatus() {
        if (this.statusPollingThread != null) {
            this.statusPollingThread.interrupt();
            this.statusPollingThread = null;
        }
    }

    private Runnable createStatusPollingRunnable() {
        return new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                YarnApplicationReport report = (YarnApplicationReport)YarnTwillController.this.processController.getReport();
                ApplicationId appId = report.getApplicationId();
                boolean shutdown = false;
                boolean watchInstanceNode = false;
                try {
                    LOG.debug("Polling status from Yarn for {} {}.", (Object)YarnTwillController.this.appName, (Object)appId);
                    while (!Thread.currentThread().isInterrupted()) {
                        block15: {
                            if (report.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
                                shutdown = true;
                                break;
                            }
                            ApplicationAttemptId attemptId = report.getCurrentApplicationAttemptId();
                            if (YarnTwillController.this.currentAttemptId.compareTo(attemptId) != 0) {
                                LOG.info("Application attempt ID change from {} to {}", (Object)YarnTwillController.this.currentAttemptId, (Object)attemptId);
                                YarnTwillController.this.currentAttemptId = attemptId;
                                YarnTwillController.this.resetLogHandler();
                            }
                            try {
                                Stat stat = (Stat)YarnTwillController.this.zkClient.exists(YarnTwillController.this.getInstancePath()).get(5L, TimeUnit.SECONDS);
                                if (stat == null) break block15;
                                watchInstanceNode = true;
                                break;
                            }
                            catch (ExecutionException e) {
                                LOG.debug("Failed in exists call on ZK path {}.", (Object)YarnTwillController.this.getInstancePath(), (Object)e);
                            }
                            catch (TimeoutException e) {
                                LOG.debug("Timeout in exists call on ZK path {}.", (Object)YarnTwillController.this.getInstancePath(), (Object)e);
                            }
                        }
                        TimeUnit.SECONDS.sleep(1L);
                        report = (YarnApplicationReport)YarnTwillController.this.processController.getReport();
                    }
                }
                catch (InterruptedException e) {
                    LOG.debug("Status polling thread interrupted for application {} {}", (Object)YarnTwillController.this.appName, (Object)appId);
                }
                LOG.debug("Stop polling status from Yarn for {} {}.", (Object)YarnTwillController.this.appName, (Object)appId);
                if (shutdown) {
                    LOG.info("Yarn application {} {} completed. Shutting down controller.", (Object)YarnTwillController.this.appName, (Object)appId);
                    YarnTwillController.this.forceShutDown();
                } else if (watchInstanceNode) {
                    LOG.info("Rewatch instance node for {} {} at {}", new Object[]{YarnTwillController.this.appName, appId, YarnTwillController.this.getInstancePath()});
                    YarnTwillController yarnTwillController = YarnTwillController.this;
                    synchronized (yarnTwillController) {
                        YarnTwillController.this.statusPollingThread = null;
                        YarnTwillController.this.watchInstanceNode();
                    }
                }
            }
        };
    }

    private boolean hasRun(YarnApplicationState state) {
        switch (state) {
            case RUNNING: 
            case FINISHED: 
            case FAILED: 
            case KILLED: {
                return true;
            }
        }
        return false;
    }

    public ResourceReport getResourceReport() {
        if (this.state() != Service.State.RUNNING) {
            return null;
        }
        ResourceReportClient resourcesClient = this.getResourcesClient();
        return resourcesClient == null ? null : resourcesClient.get();
    }

    @Nullable
    private ResourceReportClient getResourcesClient() {
        YarnApplicationReport report = (YarnApplicationReport)this.processController.getReport();
        ArrayList<URL> urls = new ArrayList<URL>(2);
        for (String url : Arrays.asList(report.getTrackingUrl(), report.getOriginalTrackingUrl())) {
            if (url == null || url.equals("N/A")) continue;
            try {
                URL trackingUrl = new URL(url);
                String path = trackingUrl.getPath();
                if (path.endsWith("/")) {
                    path = path.substring(0, path.length() - 1);
                }
                urls.add(new URL(trackingUrl.getProtocol(), trackingUrl.getHost(), trackingUrl.getPort(), path + "/resources"));
            }
            catch (MalformedURLException e) {
                LOG.debug("Invalid tracking URL {} from YARN application report for {}:{}", new Object[]{url, this.appName, this.getRunId()});
            }
        }
        if (urls.isEmpty()) {
            return null;
        }
        return new ResourceReportClient(urls);
    }
}

