/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.DataInputStream;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;

public class ContainerLocalizer {
    static final Log LOG = LogFactory.getLog(ContainerLocalizer.class);
    public static final String FILECACHE = "filecache";
    public static final String APPCACHE = "appcache";
    public static final String USERCACHE = "usercache";
    public static final String OUTPUTDIR = "output";
    public static final String TOKEN_FILE_NAME_FMT = "%s.tokens";
    public static final String WORKDIR = "work";
    private static final String APPCACHE_CTXT_FMT = "%s.app.cache.dirs";
    private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs";
    private final String user;
    private final String appId;
    private final List<Path> localDirs;
    private final String localizerId;
    private final FileContext lfs;
    private final Configuration conf;
    private final RecordFactory recordFactory;
    private final Map<LocalResource, Future<Path>> pendingResources;
    private final String appCacheDirContextName;

    public ContainerLocalizer(FileContext lfs, String user, String appId, String localizerId, List<Path> localDirs, RecordFactory recordFactory) throws IOException {
        if (null == user) {
            throw new IOException("Cannot initialize for null user");
        }
        if (null == localizerId) {
            throw new IOException("Cannot initialize for null containerId");
        }
        this.lfs = lfs;
        this.user = user;
        this.appId = appId;
        this.localDirs = localDirs;
        this.localizerId = localizerId;
        this.recordFactory = recordFactory;
        this.conf = new Configuration();
        this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
        this.pendingResources = new HashMap<LocalResource, Future<Path>>();
    }

    LocalizationProtocol getProxy(InetSocketAddress nmAddr) {
        YarnRPC rpc = YarnRPC.create((Configuration)this.conf);
        return (LocalizationProtocol)rpc.getProxy(LocalizationProtocol.class, nmAddr, this.conf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int runLocalization(final InetSocketAddress nmAddr) throws IOException, InterruptedException {
        ContainerLocalizer.initDirs(this.conf, this.user, this.appId, this.lfs, this.localDirs);
        Credentials creds = new Credentials();
        FilterInputStream credFile = null;
        try {
            credFile = this.lfs.open(new Path(String.format(TOKEN_FILE_NAME_FMT, this.localizerId)));
            creds.readTokenStorageStream((DataInputStream)credFile);
        }
        finally {
            if (credFile != null) {
                credFile.close();
            }
        }
        UserGroupInformation remoteUser = UserGroupInformation.createRemoteUser((String)this.user);
        remoteUser.addToken(creds.getToken(LocalizerTokenIdentifier.KIND));
        LocalizationProtocol nodeManager = (LocalizationProtocol)remoteUser.doAs((PrivilegedAction)new PrivilegedAction<LocalizationProtocol>(){

            @Override
            public LocalizationProtocol run() {
                return ContainerLocalizer.this.getProxy(nmAddr);
            }
        });
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)this.user);
        for (Token token : creds.getAllTokens()) {
            ugi.addToken(token);
        }
        ExecutorService exec = null;
        try {
            exec = this.createDownloadThreadPool();
            CompletionService<Path> ecs = this.createCompletionService(exec);
            this.localizeFiles(nodeManager, ecs, ugi);
            int n = 0;
            return n;
        }
        catch (Throwable e) {
            e.printStackTrace(System.out);
            int n = -1;
            return n;
        }
        finally {
            try {
                if (exec != null) {
                    exec.shutdownNow();
                }
                LocalDirAllocator.removeContext((String)this.appCacheDirContextName);
            }
            finally {
                this.closeFileSystems(ugi);
            }
        }
    }

    ExecutorService createDownloadThreadPool() {
        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("ContainerLocalizer Downloader").build());
    }

    CompletionService<Path> createCompletionService(ExecutorService exec) {
        return new ExecutorCompletionService<Path>(exec);
    }

    Callable<Path> download(Path path, LocalResource rsrc, UserGroupInformation ugi) throws IOException {
        DiskChecker.checkDir((File)new File(path.toUri().getRawPath()));
        return new FSDownload(this.lfs, ugi, this.conf, path, rsrc);
    }

    static long getEstimatedSize(LocalResource rsrc) {
        if (rsrc.getSize() < 0L) {
            return -1L;
        }
        switch (rsrc.getType()) {
            case ARCHIVE: 
            case PATTERN: {
                return 5L * rsrc.getSize();
            }
        }
        return rsrc.getSize();
    }

    void sleep(int duration) throws InterruptedException {
        TimeUnit.SECONDS.sleep(duration);
    }

    protected void closeFileSystems(UserGroupInformation ugi) {
        try {
            FileSystem.closeAllForUGI((UserGroupInformation)ugi);
        }
        catch (IOException e) {
            LOG.warn((Object)"Failed to close filesystems: ", (Throwable)e);
        }
    }

    protected void localizeFiles(LocalizationProtocol nodemanager, CompletionService<Path> cs, UserGroupInformation ugi) throws IOException {
        try {
            while (true) {
                LocalizerStatus status = this.createStatus();
                LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
                switch (response.getLocalizerAction()) {
                    case LIVE: {
                        List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
                        for (ResourceLocalizationSpec resourceLocalizationSpec : newRsrcs) {
                            if (this.pendingResources.containsKey(resourceLocalizationSpec.getResource())) continue;
                            this.pendingResources.put(resourceLocalizationSpec.getResource(), cs.submit(this.download(new Path(resourceLocalizationSpec.getDestinationDirectory().getFile()), resourceLocalizationSpec.getResource(), ugi)));
                        }
                        break;
                    }
                    case DIE: {
                        for (Future future : this.pendingResources.values()) {
                            future.cancel(true);
                        }
                        status = this.createStatus();
                        try {
                            nodemanager.heartbeat(status);
                        }
                        catch (YarnException e) {
                            // empty catch block
                        }
                        return;
                    }
                }
                cs.poll(1000L, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException e) {
            return;
        }
        catch (YarnException e) {
            return;
        }
    }

    private LocalizerStatus createStatus() throws InterruptedException {
        ArrayList<LocalResourceStatus> currentResources = new ArrayList<LocalResourceStatus>();
        Iterator<LocalResource> i = this.pendingResources.keySet().iterator();
        while (i.hasNext()) {
            LocalResource rsrc = i.next();
            LocalResourceStatus stat = (LocalResourceStatus)this.recordFactory.newRecordInstance(LocalResourceStatus.class);
            stat.setResource(rsrc);
            Future<Path> fPath = this.pendingResources.get(rsrc);
            if (fPath.isDone()) {
                try {
                    Path localPath = fPath.get();
                    stat.setLocalPath(ConverterUtils.getYarnUrlFromPath((Path)localPath));
                    stat.setLocalSize(FileUtil.getDU((File)new File(localPath.getParent().toUri())));
                    stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
                }
                catch (ExecutionException e) {
                    stat.setStatus(ResourceStatusType.FETCH_FAILURE);
                    stat.setException(SerializedException.newInstance((Throwable)e.getCause()));
                }
                catch (CancellationException e) {
                    stat.setStatus(ResourceStatusType.FETCH_FAILURE);
                    stat.setException(SerializedException.newInstance((Throwable)e));
                }
                i.remove();
            } else {
                stat.setStatus(ResourceStatusType.FETCH_PENDING);
            }
            currentResources.add(stat);
        }
        LocalizerStatus status = (LocalizerStatus)this.recordFactory.newRecordInstance(LocalizerStatus.class);
        status.setLocalizerId(this.localizerId);
        status.addAllResources(currentResources);
        return status;
    }

    public static void main(String[] argv) throws Throwable {
        Thread.setDefaultUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)new YarnUncaughtExceptionHandler());
        try {
            String user = argv[0];
            String appId = argv[1];
            String locId = argv[2];
            InetSocketAddress nmAddr = new InetSocketAddress(argv[3], Integer.parseInt(argv[4]));
            String[] sLocaldirs = Arrays.copyOfRange(argv, 5, argv.length);
            ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length);
            for (String sLocaldir : sLocaldirs) {
                localDirs.add(new Path(sLocaldir));
            }
            String uid = UserGroupInformation.getCurrentUser().getShortUserName();
            if (!user.equals(uid)) {
                LOG.warn((Object)("Localization running as " + uid + " not " + user));
            }
            ContainerLocalizer localizer = new ContainerLocalizer(FileContext.getLocalFSFileContext(), user, appId, locId, localDirs, RecordFactoryProvider.getRecordFactory(null));
            System.exit(localizer.runLocalization(nmAddr));
        }
        catch (Throwable e) {
            e.printStackTrace(System.out);
            throw e;
        }
    }

    private static void initDirs(Configuration conf, String user, String appId, FileContext lfs, List<Path> localDirs) throws IOException {
        if (null == localDirs || 0 == localDirs.size()) {
            throw new IOException("Cannot initialize without local dirs");
        }
        String[] appsFileCacheDirs = new String[localDirs.size()];
        String[] usersFileCacheDirs = new String[localDirs.size()];
        int n = localDirs.size();
        for (int i = 0; i < n; ++i) {
            Path base = lfs.makeQualified(new Path(new Path(localDirs.get(i), USERCACHE), user));
            Path userFileCacheDir = new Path(base, FILECACHE);
            usersFileCacheDirs[i] = userFileCacheDir.toString();
            lfs.mkdir(userFileCacheDir, null, false);
            Path appBase = new Path(base, new Path(APPCACHE, appId));
            Path appFileCacheDir = new Path(appBase, FILECACHE);
            appsFileCacheDirs[i] = appFileCacheDir.toString();
            lfs.mkdir(appFileCacheDir, null, false);
        }
        conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
        conf.setStrings(String.format(USERCACHE_CTXT_FMT, user), usersFileCacheDirs);
    }
}

