/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregator;
import org.apache.hadoop.yarn.util.ConverterUtils;

public class AppLogAggregatorImpl
implements AppLogAggregator {
    private static final Log LOG = LogFactory.getLog(AppLogAggregatorImpl.class);
    private static final int THREAD_SLEEP_TIME = 1000;
    private static final String TMP_FILE_SUFFIX = ".tmp";
    private final LocalDirsHandlerService dirsHandler;
    private final Dispatcher dispatcher;
    private final ApplicationId appId;
    private final String applicationId;
    private boolean logAggregationDisabled = false;
    private final Configuration conf;
    private final DeletionService delService;
    private final UserGroupInformation userUgi;
    private final Path remoteNodeLogFileForApp;
    private final Path remoteNodeTmpLogFileForApp;
    private final ContainerLogsRetentionPolicy retentionPolicy;
    private final BlockingQueue<ContainerId> pendingContainers;
    private final AtomicBoolean appFinishing = new AtomicBoolean();
    private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
    private final Map<ApplicationAccessType, String> appAcls;
    private AggregatedLogFormat.LogWriter writer = null;

    public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, ApplicationId appId, UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, ContainerLogsRetentionPolicy retentionPolicy, Map<ApplicationAccessType, String> appAcls) {
        this.dispatcher = dispatcher;
        this.conf = conf;
        this.delService = deletionService;
        this.appId = appId;
        this.applicationId = ConverterUtils.toString((ApplicationId)appId);
        this.userUgi = userUgi;
        this.dirsHandler = dirsHandler;
        this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
        this.remoteNodeTmpLogFileForApp = this.getRemoteNodeTmpLogFileForApp();
        this.retentionPolicy = retentionPolicy;
        this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
        this.appAcls = appAcls;
    }

    private void uploadLogsForContainer(ContainerId containerId) {
        if (this.logAggregationDisabled) {
            return;
        }
        if (this.writer == null) {
            LOG.info((Object)("Starting aggregate log-file for app " + this.applicationId + " at " + this.remoteNodeTmpLogFileForApp));
            try {
                this.writer = new AggregatedLogFormat.LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, this.userUgi);
                this.writer.writeApplicationACLs(this.appAcls);
                this.writer.writeApplicationOwner(this.userUgi.getShortUserName());
            }
            catch (IOException e) {
                LOG.error((Object)("Cannot create writer for app " + this.applicationId + ". Disabling log-aggregation for this app."), (Throwable)e);
                this.logAggregationDisabled = true;
                return;
            }
        }
        LOG.info((Object)("Uploading logs for container " + containerId + ". Current good log dirs are " + StringUtils.join((CharSequence)",", this.dirsHandler.getLogDirs())));
        AggregatedLogFormat.LogKey logKey = new AggregatedLogFormat.LogKey(containerId);
        AggregatedLogFormat.LogValue logValue = new AggregatedLogFormat.LogValue(this.dirsHandler.getLogDirs(), containerId);
        try {
            this.writer.append(logKey, logValue);
        }
        catch (IOException e) {
            LOG.error((Object)("Couldn't upload logs for " + containerId + ". Skipping this container."));
        }
    }

    @Override
    public void run() {
        ContainerId containerId;
        while (!this.appFinishing.get()) {
            try {
                containerId = (ContainerId)this.pendingContainers.poll();
                if (containerId == null) {
                    Thread.sleep(1000L);
                    continue;
                }
                this.uploadLogsForContainer(containerId);
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"PendingContainers queue is interrupted");
            }
        }
        while ((containerId = (ContainerId)this.pendingContainers.poll()) != null) {
            this.uploadLogsForContainer(containerId);
        }
        List<String> rootLogDirs = this.dirsHandler.getLogDirs();
        Path[] localAppLogDirs = new Path[rootLogDirs.size()];
        int index = 0;
        for (String rootLogDir : rootLogDirs) {
            localAppLogDirs[index] = new Path(rootLogDir, this.applicationId);
            ++index;
        }
        this.delService.delete(this.userUgi.getShortUserName(), null, localAppLogDirs);
        if (this.writer != null) {
            this.writer.closeWriter();
            LOG.info((Object)("Finished aggregate log-file for app " + this.applicationId));
        }
        try {
            this.userUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Object>(){

                @Override
                public Object run() throws Exception {
                    FileSystem remoteFS = FileSystem.get((Configuration)AppLogAggregatorImpl.this.conf);
                    remoteFS.rename(AppLogAggregatorImpl.this.remoteNodeTmpLogFileForApp, AppLogAggregatorImpl.this.remoteNodeLogFileForApp);
                    return null;
                }
            });
        }
        catch (Exception e) {
            LOG.error((Object)("Failed to move temporary log file to final location: [" + this.remoteNodeTmpLogFileForApp + "] to [" + this.remoteNodeLogFileForApp + "]"), (Throwable)e);
        }
        this.dispatcher.getEventHandler().handle((Event)new ApplicationEvent(this.appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
        this.appAggregationFinished.set(true);
    }

    private Path getRemoteNodeTmpLogFileForApp() {
        return new Path(this.remoteNodeLogFileForApp.getParent(), this.remoteNodeLogFileForApp.getName() + TMP_FILE_SUFFIX);
    }

    private boolean shouldUploadLogs(ContainerId containerId, boolean wasContainerSuccessful) {
        if (this.retentionPolicy.equals((Object)ContainerLogsRetentionPolicy.ALL_CONTAINERS)) {
            return true;
        }
        if (this.retentionPolicy.equals((Object)ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY)) {
            return containerId.getId() == 1;
        }
        if (this.retentionPolicy.equals((Object)ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY)) {
            if (containerId.getId() == 1) {
                return true;
            }
            return !wasContainerSuccessful;
        }
        return false;
    }

    @Override
    public void startContainerLogAggregation(ContainerId containerId, boolean wasContainerSuccessful) {
        if (this.shouldUploadLogs(containerId, wasContainerSuccessful)) {
            LOG.info((Object)("Considering container " + containerId + " for log-aggregation"));
            this.pendingContainers.add(containerId);
        }
    }

    @Override
    public void finishLogAggregation() {
        LOG.info((Object)("Application just finished : " + this.applicationId));
        this.appFinishing.set(true);
    }

    @Override
    public void join() {
        this.finishLogAggregation();
        while (!this.appAggregationFinished.get()) {
            LOG.info((Object)("Waiting for aggregation to complete for " + this.applicationId));
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Join interrupted. Some logs may not have been aggregated!!");
                break;
            }
        }
    }
}

