/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.job.yarn;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.ClusterResourceManager;
import org.apache.samza.clustermanager.FaultDomain;
import org.apache.samza.clustermanager.ProcessorLaunchException;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.SamzaResource;
import org.apache.samza.clustermanager.SamzaResourceRequest;
import org.apache.samza.clustermanager.SamzaResourceStatus;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.job.yarn.FileSystemImplConfig;
import org.apache.samza.job.yarn.JobContext;
import org.apache.samza.job.yarn.LocalizerResourceConfig;
import org.apache.samza.job.yarn.LocalizerResourceMapper;
import org.apache.samza.job.yarn.SamzaAppMasterMetrics;
import org.apache.samza.job.yarn.SamzaYarnAppMasterLifecycle;
import org.apache.samza.job.yarn.SamzaYarnAppMasterService;
import org.apache.samza.job.yarn.YarnAppState;
import org.apache.samza.job.yarn.YarnContainer;
import org.apache.samza.job.yarn.YarnJobUtil;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.util.Util;
import org.apache.samza.util.hadoop.HttpFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YarnClusterResourceManager
extends ClusterResourceManager
implements AMRMClientAsync.CallbackHandler,
NMClientAsync.CallbackHandler {
    private static final int PREFERRED_HOST_PRIORITY = 0;
    private static final int ANY_HOST_PRIORITY = 1;
    private static final String INVALID_PROCESSOR_ID = "-1";
    private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
    private final YarnConfiguration yarnConfiguration;
    private final YarnAppState state;
    private final SamzaYarnAppMasterLifecycle lifecycle;
    private final SamzaYarnAppMasterService service;
    private final YarnConfig yarnConfig;
    private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap();
    private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap();
    private final SamzaAppMasterMetrics metrics;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Object lock = new Object();
    private final NMClientAsync nmClientAsync;
    private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);
    private final Config config;

    YarnClusterResourceManager(AMRMClientAsync amClientAsync, NMClientAsync nmClientAsync, ClusterResourceManager.Callback callback, YarnAppState yarnAppState, SamzaYarnAppMasterLifecycle lifecycle, SamzaYarnAppMasterService service, SamzaAppMasterMetrics metrics, YarnConfiguration yarnConfiguration, Config config) {
        super(callback);
        this.yarnConfiguration = yarnConfiguration;
        this.metrics = metrics;
        this.yarnConfig = new YarnConfig(config);
        this.config = config;
        this.amClient = amClientAsync;
        this.state = yarnAppState;
        this.lifecycle = lifecycle;
        this.service = service;
        this.nmClientAsync = nmClientAsync;
    }

    public YarnClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState) {
        super(callback);
        YarnConfig yarnConfig;
        this.yarnConfiguration = new YarnConfiguration();
        this.yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
        FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
        fsImplConfig.getSchemes().forEach(scheme -> fsImplConfig.getSchemeConfig((String)scheme).forEach((confKey, confValue) -> this.yarnConfiguration.set(confKey, confValue)));
        MetricsRegistryMap registry = new MetricsRegistryMap();
        this.metrics = new SamzaAppMasterMetrics(config, samzaAppState, (ReadableMetricsRegistry)registry);
        String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
        ContainerId containerId = ConverterUtils.toContainerId((String)containerIdStr);
        String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
        String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
        String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());
        int nodePort = Integer.parseInt(nodePortString);
        int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
        this.yarnConfig = yarnConfig = new YarnConfig(config);
        this.config = config;
        int interval = yarnConfig.getAMPollIntervalMs();
        this.amClient = AMRMClientAsync.createAMRMClientAsync((int)interval, (AMRMClientAsync.CallbackHandler)this);
        this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort);
        log.info("Initialized YarnAppState: {}", (Object)this.state.toString());
        this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, (ReadableMetricsRegistry)registry, this.yarnConfiguration);
        log.info("Container ID: {}, Nodehost:  {} , Nodeport : {} , NodeHttpport: {}", new Object[]{containerIdStr, nodeHostString, nodePort, nodeHttpPort});
        ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
        this.lifecycle = new SamzaYarnAppMasterLifecycle(clusterManagerConfig.getContainerMemoryMb(), clusterManagerConfig.getNumCores(), samzaAppState, this.state, this.amClient, new JobConfig(config).getApplicationMasterHighAvailabilityEnabled());
        this.nmClientAsync = NMClientAsync.createNMClientAsync((NMClientAsync.CallbackHandler)this);
    }

    public void start() {
        if (!this.started.compareAndSet(false, true)) {
            log.info("Attempting to start an already started YarnClusterResourceManager");
            return;
        }
        this.metrics.start();
        this.service.onInit();
        log.info("Starting YarnClusterResourceManager.");
        this.amClient.init((Configuration)this.yarnConfiguration);
        this.amClient.start();
        this.nmClientAsync.init((Configuration)this.yarnConfiguration);
        this.nmClientAsync.start();
        Set<ContainerId> previousAttemptsContainers = this.lifecycle.onInit();
        if (new JobConfig(this.config).getApplicationMasterHighAvailabilityEnabled()) {
            log.info("Received running containers from previous attempt. Invoking launch success for them.");
            previousAttemptsContainers.forEach(this::handleOnContainerStarted);
        }
        if (this.lifecycle.shouldShutdown()) {
            this.clusterManagerCallback.onError((Throwable)new SamzaException("Invalid resource request."));
        }
        log.info("Finished starting YarnClusterResourceManager");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void requestResources(SamzaResourceRequest resourceRequest) {
        AMRMClient.ContainerRequest issuedRequest;
        String processorId = resourceRequest.getProcessorId();
        String requestId = resourceRequest.getRequestId();
        String preferredHost = resourceRequest.getPreferredHost();
        Object[] racks = (String[])resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
        int memoryMb = resourceRequest.getMemoryMB();
        int cpuCores = resourceRequest.getNumCores();
        Resource capability = Resource.newInstance((int)memoryMb, (int)cpuCores);
        String nodeLabelsExpression = this.yarnConfig.getContainerLabel();
        if (preferredHost.equals("ANY_HOST")) {
            Priority priority = Priority.newInstance((int)1);
            boolean relaxLocality = true;
            log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}", new Object[]{processorId, null, Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression});
            issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, relaxLocality, nodeLabelsExpression);
        } else {
            Object[] nodes = new String[]{preferredHost};
            Priority priority = Priority.newInstance((int)0);
            boolean relaxLocality = false;
            log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}", new Object[]{processorId, Arrays.toString(nodes), Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression});
            issuedRequest = new AMRMClient.ContainerRequest(capability, (String[])nodes, (String[])racks, priority, relaxLocality, nodeLabelsExpression);
        }
        Object object = this.lock;
        synchronized (object) {
            this.requestsMap.put(resourceRequest, issuedRequest);
            this.amClient.addContainerRequest(issuedRequest);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseResources(SamzaResource resource) {
        log.info("Releasing Container ID: {} on host: {}", (Object)resource.getContainerId(), (Object)resource.getHost());
        Object object = this.lock;
        synchronized (object) {
            Container container = this.allocatedResources.get(resource);
            if (container == null) {
                log.info("Container ID: {} on host: {} was already released.", (Object)resource.getContainerId(), (Object)resource.getHost());
                return;
            }
            this.amClient.releaseAssignedContainer(container.getId());
            this.allocatedResources.remove(resource);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
        String processorId = (String)builder.buildEnvironment().get("SAMZA_CONTAINER_ID");
        String containerId = resource.getContainerId();
        String host = resource.getHost();
        log.info("Starting Processor ID: {} on Container ID: {} on host: {}", new Object[]{processorId, containerId, host});
        Object object = this.lock;
        synchronized (object) {
            try {
                Container container = this.allocatedResources.get(resource);
                if (container == null) {
                    log.info("Container ID: {} on host: {} was already allocated / released.", (Object)containerId, (Object)host);
                    return;
                }
                this.runProcessor(processorId, container, builder);
            }
            catch (Throwable t) {
                log.info("Error starting Processor ID: {} on Container ID: {} on host: {}", new Object[]{processorId, containerId, host, t});
                this.clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopStreamProcessor(SamzaResource resource) {
        Object object = this.lock;
        synchronized (object) {
            Container container = this.allocatedResources.get(resource);
            String containerId = resource.getContainerId();
            String containerHost = resource.getHost();
            if (container != null) {
                log.info("Stopping Container ID: {} on host: {}", (Object)containerId, (Object)containerHost);
                this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
            } else {
                YarnContainer yarnContainer = this.state.runningProcessors.get(this.getRunningProcessorId(containerId));
                if (yarnContainer != null) {
                    log.info("Stopping container from previous attempt with Container ID: {} on host: {}", (Object)containerId, (Object)containerHost);
                    this.amClient.releaseAssignedContainer(yarnContainer.id());
                } else {
                    log.info("No container with Container ID: {} exists. Ignoring the stop request", (Object)containerId);
                }
            }
        }
    }

    private String getRunningProcessorId(String containerId) {
        for (Map.Entry<String, YarnContainer> entry : this.state.runningProcessors.entrySet()) {
            String key = entry.getKey();
            YarnContainer yarnContainer = entry.getValue();
            String yarnContainerId = yarnContainer.id().toString();
            if (!yarnContainerId.equals(containerId)) continue;
            return key;
        }
        return INVALID_PROCESSOR_ID;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelResourceRequest(SamzaResourceRequest request) {
        String processorId = request.getProcessorId();
        String preferredHost = request.getPreferredHost();
        String requestId = request.getRequestId();
        log.info("Cancelling resource request for Processor ID: {} on host: {} with Request ID: {}", new Object[]{processorId, preferredHost, requestId});
        Object object = this.lock;
        synchronized (object) {
            AMRMClient.ContainerRequest containerRequest = this.requestsMap.get(request);
            if (containerRequest == null) {
                log.info("Resource request for Processor ID: {} on host: {} with Request ID: {} already cancelled.", new Object[]{processorId, preferredHost, requestId});
                return;
            }
            this.requestsMap.remove(request);
            this.amClient.removeContainerRequest(containerRequest);
        }
    }

    public void stop(SamzaApplicationState.SamzaAppStatus status2) {
        log.info("Stopping the AM client on shutdown request.");
        this.lifecycle.onShutdown(status2);
        this.amClient.stop();
        log.info("Stopping the NM client on shutdown request.");
        this.nmClientAsync.stop();
        log.info("Stopping the SamzaYarnAppMasterService service on shutdown request.");
        this.service.onShutdown();
        log.info("Stopping SamzaAppMasterMetrics on shutdown request.");
        this.metrics.stop();
        if (status2 != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
            this.cleanupStagingDir();
        }
    }

    private void cleanupStagingDir() {
        String yarnJobStagingDirectory = this.yarnConfig.getYarnJobStagingDirectory();
        if (yarnJobStagingDirectory != null) {
            JobContext context = new JobContext();
            context.setAppStagingDir(new Path(yarnJobStagingDirectory));
            FileSystem fs = null;
            try {
                fs = FileSystem.get((Configuration)this.yarnConfiguration);
            }
            catch (IOException e) {
                log.error("Unable to clean up file system.", (Throwable)e);
                return;
            }
            if (fs != null) {
                YarnJobUtil.cleanupStagingDir(context, fs);
            }
        }
    }

    public void onContainersCompleted(List<ContainerStatus> statuses) {
        ArrayList<SamzaResourceStatus> samzaResourceStatuses = new ArrayList<SamzaResourceStatus>();
        for (ContainerStatus status2 : statuses) {
            log.info("Got completion notification for Container ID: {} with status: {} and state: {}. Diagnostics information: {}.", new Object[]{status2.getContainerId(), status2.getExitStatus(), status2.getState(), status2.getDiagnostics()});
            SamzaResourceStatus samzaResourceStatus = new SamzaResourceStatus(status2.getContainerId().toString(), status2.getDiagnostics(), status2.getExitStatus());
            samzaResourceStatuses.add(samzaResourceStatus);
            String completedProcessorID = this.getRunningProcessorId(status2.getContainerId().toString());
            log.info("Completed Container ID: {} had Processor ID: {}", (Object)status2.getContainerId(), (Object)completedProcessorID);
            if (completedProcessorID.equals(INVALID_PROCESSOR_ID) || !this.state.runningProcessors.containsKey(completedProcessorID)) continue;
            log.info("Removing Processor ID: {} from YarnClusterResourceManager running processors.", (Object)completedProcessorID);
            this.state.runningProcessors.remove(completedProcessorID);
            if (status2.getExitStatus() == 0) continue;
            this.state.failedContainersStatus.put(status2.getContainerId().toString(), status2);
        }
        this.clusterManagerCallback.onResourcesCompleted(samzaResourceStatuses);
    }

    public void onContainersAllocated(List<Container> containers) {
        ArrayList<SamzaResource> resources = new ArrayList<SamzaResource>();
        for (Container container : containers) {
            log.info("Got allocation notification for Container ID: {} on host: {}", (Object)container.getId(), (Object)container.getNodeId().getHost());
            String containerId = container.getId().toString();
            String host = container.getNodeId().getHost();
            int memory = container.getResource().getMemory();
            int numCores = container.getResource().getVirtualCores();
            SamzaResource resource = new SamzaResource(numCores, memory, host, containerId);
            this.allocatedResources.put(resource, container);
            resources.add(resource);
        }
        this.clusterManagerCallback.onResourcesAvailable(resources);
    }

    public void onShutdownRequest() {
        this.stop(SamzaApplicationState.SamzaAppStatus.FAILED);
    }

    public void onNodesUpdated(List<NodeReport> updatedNodes) {
    }

    public float getProgress() {
        return 0.0f;
    }

    public void onError(Throwable e) {
        log.error("Exception in the Yarn callback", e);
        this.clusterManagerCallback.onError(e);
    }

    public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
        this.handleOnContainerStarted(containerId);
    }

    public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
        log.info("Got status notification for Container ID: {} for Processor ID: {}. Status: {}", new Object[]{containerId, this.getRunningProcessorId(containerId.toString()), containerStatus.getState()});
    }

    public void onContainerStopped(ContainerId containerId) {
        log.info("Got stop notification for Container ID: {} for Processor ID: {}", (Object)containerId, (Object)this.getRunningProcessorId(containerId.toString()));
    }

    public void onStartContainerError(ContainerId containerId, Throwable t) {
        String processorId = this.getPendingProcessorId(containerId);
        if (processorId != null) {
            log.info("Got start error notification for Container ID: {} for Processor ID: {} ", new Object[]{containerId, processorId, t});
            YarnContainer container = this.state.pendingProcessors.remove(processorId);
            SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(), container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
            this.clusterManagerCallback.onStreamProcessorLaunchFailure(resource, (Throwable)new ProcessorLaunchException(t));
        } else {
            log.warn("Did not find the pending Processor ID for the start error notification for Container ID: {}. Ignoring notification", (Object)containerId);
        }
    }

    public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
        log.info("Got status error notification for Container ID: {} for Processor ID: {}", new Object[]{containerId, this.getRunningProcessorId(containerId.toString()), t});
    }

    public void onStopContainerError(ContainerId containerId, Throwable t) {
        String processorId = this.getRunningProcessorId(containerId.toString());
        if (processorId != null) {
            log.info("Got stop error notification for Container ID: {} for Processor ID: {}", new Object[]{containerId, processorId, t});
            YarnContainer container = this.state.runningProcessors.get(processorId);
            SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(), container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
            this.clusterManagerCallback.onStreamProcessorStopFailure(resource, t);
        } else {
            log.warn("Did not find the running Processor ID for the stop error notification for Container ID: {}. Ignoring notification", (Object)containerId);
        }
    }

    public boolean isResourceExpired(SamzaResource resource) {
        Duration yarnAllocatedResourceExpiry = Duration.ofMillis(600000L).minus(Duration.ofSeconds(30L));
        return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
    }

    public void runProcessor(String processorId, Container container, CommandBuilder cmdBuilder) throws IOException {
        String containerIdStr = ConverterUtils.toString((ContainerId)container.getId());
        String cmdPath = "./__package/";
        cmdBuilder.setCommandPath(cmdPath);
        String command = cmdBuilder.buildCommand();
        Map<String, String> env = this.getEscapedEnvironmentVariablesMap(cmdBuilder);
        env.put("EXECUTION_ENV_CONTAINER_ID", Util.envVarEscape((String)container.getId().toString()));
        Path packagePath = new Path(this.yarnConfig.getPackagePath());
        String formattedCommand = this.getFormattedCommand("<LOG_DIR>", command, "stdout", "stderr");
        log.info("Running Processor ID: {} on Container ID: {} on host: {} using command: {} and env: {} and package path: {}", new Object[]{processorId, containerIdStr, container.getNodeHttpAddress(), formattedCommand, env, packagePath});
        this.state.pendingProcessors.put(processorId, new YarnContainer(container));
        this.startContainer(packagePath, container, env, formattedCommand);
        log.info("Made start request for Processor ID: {} on Container ID: {} on host: {} (http://{}/node/containerlogs/{}).", new Object[]{processorId, containerIdStr, container.getNodeId().getHost(), container.getNodeHttpAddress(), containerIdStr});
    }

    private void startContainer(Path packagePath, Container container, Map<String, String> env, final String cmd) throws IOException {
        Map<ApplicationAccessType, String> acls;
        LocalResource packageResource = (LocalResource)Records.newRecord(LocalResource.class);
        URL packageUrl = ConverterUtils.getYarnUrlFromPath((Path)packagePath);
        FileStatus fileStatus = packagePath.getFileSystem((Configuration)this.yarnConfiguration).getFileStatus(packagePath);
        packageResource.setResource(packageUrl);
        log.debug("Set package resource in YarnContainerRunner for {}", (Object)packageUrl);
        packageResource.setSize(fileStatus.getLen());
        packageResource.setTimestamp(fileStatus.getModificationTime());
        packageResource.setType(LocalResourceType.ARCHIVE);
        packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        DataOutputBuffer dob = new DataOutputBuffer();
        credentials.writeTokenStorageToStream((DataOutputStream)dob);
        Iterator iter = credentials.getAllTokens().iterator();
        while (iter.hasNext()) {
            TokenIdentifier token = ((Token)iter.next()).decodeIdentifier();
            if (token == null || !token.getKind().equals((Object)AMRMTokenIdentifier.KIND_NAME)) continue;
            iter.remove();
        }
        ByteBuffer allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        HashMap<String, LocalResource> localResourceMap = new HashMap<String, LocalResource>();
        localResourceMap.put("__package", packageResource);
        LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(this.config), this.yarnConfiguration);
        localResourceMap.putAll(resourceMapper.getResourceMap());
        ContainerLaunchContext context = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        context.setEnvironment(env);
        context.setTokens(allTokens.duplicate());
        context.setCommands((List)new ArrayList<String>(){
            {
                this.add(cmd);
            }
        });
        context.setLocalResources(localResourceMap);
        if (UserGroupInformation.isSecurityEnabled() && !(acls = this.yarnConfig.getYarnApplicationAcls()).isEmpty()) {
            context.setApplicationACLs(acls);
        }
        log.debug("Setting localResourceMap to {}", localResourceMap);
        log.debug("Setting context to {}", (Object)context);
        StartContainerRequest startContainerRequest = (StartContainerRequest)Records.newRecord(StartContainerRequest.class);
        startContainerRequest.setContainerLaunchContext(context);
        log.info("Making an async start request for Container ID: {} on host: {} with local resource map: {} and context: {}", new Object[]{container.getId(), container.getNodeHttpAddress(), ((Object)localResourceMap).toString(), context});
        this.nmClientAsync.startContainerAsync(container, context);
    }

    private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
        HashMap<String, String> env = new HashMap<String, String>();
        for (Map.Entry entry : cmdBuilder.buildEnvironment().entrySet()) {
            String escapedValue = Util.envVarEscape((String)((String)entry.getValue()));
            env.put((String)entry.getKey(), escapedValue);
        }
        return env;
    }

    private String getFormattedCommand(String logDirExpansionVar, String command, String stdOut, String stdErr) {
        return String.format("export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar, logDirExpansionVar, command, stdOut, stdErr);
    }

    private String getPendingProcessorId(ContainerId containerId) {
        for (String pendingProcessorId : this.state.pendingProcessors.keySet()) {
            YarnContainer yarnContainer = this.state.pendingProcessors.get(pendingProcessorId);
            if (yarnContainer == null || !yarnContainer.id().equals((Object)containerId)) continue;
            return pendingProcessorId;
        }
        return null;
    }

    private void handleOnContainerStarted(ContainerId containerId) {
        String processorId = this.getPendingProcessorId(containerId);
        if (processorId != null) {
            log.info("Got start notification for Container ID: {} for Processor ID: {}", (Object)containerId, (Object)processorId);
            YarnContainer container = this.state.pendingProcessors.remove(processorId);
            this.state.runningProcessors.put(processorId, container);
            SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(), container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
            this.clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
        } else {
            log.warn("Did not find the Processor ID for the start notification for Container ID: {}. Ignoring notification.", (Object)containerId);
        }
    }

    @VisibleForTesting
    ConcurrentHashMap<SamzaResource, Container> getAllocatedResources() {
        return this.allocatedResources;
    }
}

