/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.internal.appmaster;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.DiscreteDomains;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Ranges;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
import org.apache.twill.api.Command;
import org.apache.twill.api.EventHandler;
import org.apache.twill.api.EventHandlerContext;
import org.apache.twill.api.EventHandlerSpecification;
import org.apache.twill.api.LocalFile;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.RunId;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.ContainerInfo;
import org.apache.twill.internal.DefaultTwillRunResources;
import org.apache.twill.internal.JvmOptions;
import org.apache.twill.internal.ProcessLauncher;
import org.apache.twill.internal.TwillContainerLauncher;
import org.apache.twill.internal.TwillRuntimeSpecification;
import org.apache.twill.internal.appmaster.AllocationSpecification;
import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
import org.apache.twill.internal.appmaster.BasicEventHandlerContext;
import org.apache.twill.internal.appmaster.ExpectedContainers;
import org.apache.twill.internal.appmaster.PlacementPolicyManager;
import org.apache.twill.internal.appmaster.ProvisionRequest;
import org.apache.twill.internal.appmaster.RunnableContainerRequest;
import org.apache.twill.internal.appmaster.RunningContainers;
import org.apache.twill.internal.json.LocalFileCodec;
import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.utils.Instances;
import org.apache.twill.internal.utils.Resources;
import org.apache.twill.internal.yarn.AbstractYarnTwillService;
import org.apache.twill.internal.yarn.YarnAMClient;
import org.apache.twill.internal.yarn.YarnContainerInfo;
import org.apache.twill.internal.yarn.YarnContainerStatus;
import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ApplicationMasterService
extends AbstractYarnTwillService
implements Supplier<ResourceReport> {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
    private static final Gson GSON = new GsonBuilder().serializeNulls().registerTypeAdapter(LocalFile.class, (Object)new LocalFileCodec()).create();
    private static final Text AMRM_TOKEN_KIND_NAME = new Text("YARN_AM_RM_TOKEN");
    private final RunId runId;
    private final ZKClient zkClient;
    private final TwillSpecification twillSpec;
    private final ApplicationMasterLiveNodeData amLiveNode;
    private final RunningContainers runningContainers;
    private final ExpectedContainers expectedContainers;
    private final YarnAMClient amClient;
    private final JvmOptions jvmOpts;
    private final EventHandler eventHandler;
    private final Location applicationLocation;
    private final PlacementPolicyManager placementPolicyManager;
    private final Map<String, Map<String, String>> environments;
    private final TwillRuntimeSpecification twillRuntimeSpec;
    private volatile StopStatus stopStatus;
    private volatile boolean stopped;
    private Queue<RunnableContainerRequest> runnableContainerRequests;
    private ExecutorService instanceChangeExecutor;

    public ApplicationMasterService(RunId runId, ZKClient zkClient, TwillRuntimeSpecification twillRuntimeSpec, YarnAMClient amClient, Configuration config, Location applicationLocation) throws Exception {
        super(zkClient, runId, config, applicationLocation);
        this.runId = runId;
        this.twillRuntimeSpec = twillRuntimeSpec;
        this.zkClient = zkClient;
        this.applicationLocation = applicationLocation;
        this.amClient = amClient;
        this.credentials = this.createCredentials();
        this.jvmOpts = this.loadJvmOptions();
        this.twillSpec = twillRuntimeSpec.getTwillSpecification();
        this.placementPolicyManager = new PlacementPolicyManager(this.twillSpec.getPlacementPolicies());
        this.environments = this.getEnvironments();
        this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv("YARN_APP_ID")), Long.parseLong(System.getenv("YARN_APP_ID_CLUSTER_TIME")), amClient.getContainerId().toString(), this.getLocalizeFiles(), twillRuntimeSpec.getKafkaZKConnect());
        this.expectedContainers = new ExpectedContainers(this.twillSpec);
        this.eventHandler = this.createEventHandler(this.twillSpec);
        this.runningContainers = this.createRunningContainers(amClient.getContainerId(), amClient.getHost());
    }

    private JvmOptions loadJvmOptions() throws IOException {
        File jvmOptsFile = new File("runtime.config.jar", "jvm.opts.json");
        if (!jvmOptsFile.exists()) {
            return new JvmOptions("", Collections.emptyMap(), JvmOptions.DebugOptions.NO_DEBUG);
        }
        try (BufferedReader reader = Files.newBufferedReader(jvmOptsFile.toPath(), StandardCharsets.UTF_8);){
            JvmOptions jvmOptions = (JvmOptions)GSON.fromJson((Reader)reader, JvmOptions.class);
            return jvmOptions;
        }
    }

    private EventHandler createEventHandler(TwillSpecification twillSpec) throws ClassNotFoundException {
        EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
        if (handlerSpec == null) {
            return new EventHandler(){};
        }
        Class<?> handlerClass = ((Object)((Object)this)).getClass().getClassLoader().loadClass(handlerSpec.getClassName());
        Preconditions.checkArgument((boolean)EventHandler.class.isAssignableFrom(handlerClass), (String)"Class {} does not implements {}", (Object[])new Object[]{handlerClass, EventHandler.class.getName()});
        final EventHandler delegate = (EventHandler)Instances.newInstance(handlerClass);
        return new EventHandler(){

            public void initialize(EventHandlerContext context) {
                delegate.initialize(context);
            }

            public void started() {
                try {
                    delegate.started();
                }
                catch (Throwable t) {
                    LOG.warn("Exception raised when calling {}.started()", (Object)delegate.getClass().getName(), (Object)t);
                }
            }

            public void containerLaunched(String runnableName, int instanceId, String containerId) {
                try {
                    delegate.containerLaunched(runnableName, instanceId, containerId);
                }
                catch (Throwable t) {
                    LOG.warn("Exception raised when calling {}.containerLaunched(String, int, String)", (Object)delegate.getClass().getName(), (Object)t);
                }
            }

            public void containerStopped(String runnableName, int instanceId, String containerId, int exitStatus) {
                try {
                    delegate.containerStopped(runnableName, instanceId, containerId, exitStatus);
                }
                catch (Throwable t) {
                    LOG.warn("Exception raised when calling {}.containerStopped(String, int, String, int)", (Object)delegate.getClass().getName(), (Object)t);
                }
            }

            public void completed() {
                try {
                    delegate.completed();
                }
                catch (Throwable t) {
                    LOG.warn("Exception raised when calling {}.completed()", (Object)delegate.getClass().getName(), (Object)t);
                }
            }

            public void killed() {
                try {
                    delegate.killed();
                }
                catch (Throwable t) {
                    LOG.warn("Exception raised when calling {}.killed()", (Object)delegate.getClass().getName(), (Object)t);
                }
            }

            public void aborted() {
                try {
                    delegate.aborted();
                }
                catch (Throwable t) {
                    LOG.warn("Exception raised when calling {}.aborted()", (Object)delegate.getClass().getName(), (Object)t);
                }
            }

            public void destroy() {
                try {
                    delegate.destroy();
                }
                catch (Throwable t) {
                    LOG.warn("Exception raised when calling {}.destroy()", (Object)delegate.getClass().getName(), (Object)t);
                }
            }

            public EventHandler.TimeoutAction launchTimeout(Iterable<EventHandler.TimeoutEvent> timeoutEvents) {
                try {
                    return delegate.launchTimeout(timeoutEvents);
                }
                catch (Throwable t) {
                    LOG.warn("Exception raised when calling {}.launchTimeout(Iterable<TimeoutEvent>)", (Object)delegate.getClass().getName(), (Object)t);
                    return super.launchTimeout(timeoutEvents);
                }
            }
        };
    }

    private RunningContainers createRunningContainers(ContainerId appMasterContainerId, String appMasterHost) throws Exception {
        int containerMemoryMB = Integer.parseInt(System.getenv("YARN_CONTAINER_MEMORY_MB"));
        int maxHeapMemoryMB = Resources.computeMaxHeapSize((int)containerMemoryMB, (int)this.twillRuntimeSpec.getAMReservedMemory(), (double)this.twillRuntimeSpec.getAMMinHeapRatio());
        DefaultTwillRunResources appMasterResources = new DefaultTwillRunResources(0, appMasterContainerId.toString(), Integer.parseInt(System.getenv("YARN_CONTAINER_VIRTUAL_CORES")), containerMemoryMB, maxHeapMemoryMB, appMasterHost, null);
        String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
        return new RunningContainers(this.twillRuntimeSpec, appId, (TwillRunResources)appMasterResources, this.zkClient, this.applicationLocation, this.twillSpec.getRunnables(), this.eventHandler);
    }

    public ResourceReport get() {
        return this.runningContainers.getResourceReport();
    }

    protected void doStart() throws Exception {
        LOG.info("Start application master with spec: {}", (Object)TwillRuntimeSpecificationAdapter.create().toJson(this.twillRuntimeSpec));
        this.eventHandler.initialize((EventHandlerContext)new BasicEventHandlerContext(this.twillRuntimeSpec));
        this.eventHandler.started();
        this.instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory((String)"instanceChanger"));
        ZKOperations.ignoreError((OperationFuture)this.zkClient.create("/" + this.runId.getId() + "/runnables", null, CreateMode.PERSISTENT), KeeperException.NodeExistsException.class, null).get();
        this.runningContainers.addWatcher("/discoverable");
        this.runnableContainerRequests = this.initContainerRequests();
    }

    protected void doStop(long terminationTimeoutMillis) {
        Thread.interrupted();
        LOG.info("Stop application master with spec: {}", (Object)TwillRuntimeSpecificationAdapter.create().toJson(this.twillRuntimeSpec));
        this.instanceChangeExecutor.shutdownNow();
        final HashSet ids = Sets.newHashSet(this.runningContainers.getContainerIds());
        final YarnAMClient.AllocateHandler handler = new YarnAMClient.AllocateHandler(){

            @Override
            public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers) {
            }

            @Override
            public void completed(List<YarnContainerStatus> completed) {
                for (YarnContainerStatus status : completed) {
                    ApplicationMasterService.this.handleCompleted(completed);
                    ids.remove(status.getContainerId());
                }
            }
        };
        ExecutorService stopPoller = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory((String)"stopPoller"));
        stopPoller.execute(new Runnable(){

            @Override
            public void run() {
                while (!ids.isEmpty()) {
                    try {
                        ApplicationMasterService.this.amClient.allocate(0.0f, handler);
                        if (ids.isEmpty()) continue;
                        TimeUnit.SECONDS.sleep(1L);
                    }
                    catch (Exception e) {
                        LOG.error("Got exception while getting heartbeat", (Throwable)e);
                    }
                }
            }
        });
        this.runningContainers.stopAll(terminationTimeoutMillis);
        stopPoller.shutdownNow();
        this.cleanupDir();
        if (this.stopStatus == null) {
            this.eventHandler.killed();
        } else {
            switch (this.stopStatus) {
                case COMPLETED: {
                    this.eventHandler.completed();
                    break;
                }
                case ABORTED: {
                    this.eventHandler.aborted();
                    break;
                }
                default: {
                    LOG.error("Unsupported FinalStatus '{}'", (Object)this.stopStatus.name());
                }
            }
        }
        this.eventHandler.destroy();
    }

    protected Object getLiveNodeData() {
        return this.amLiveNode;
    }

    protected Gson getLiveNodeGson() {
        return GSON;
    }

    public ListenableFuture<String> onReceived(String messageId, Message message) {
        LOG.debug("Message received: {} {}.", (Object)messageId, (Object)message);
        SettableFuture result = SettableFuture.create();
        Runnable completion = this.getMessageCompletion(messageId, (SettableFuture<String>)result);
        if (this.handleSecureStoreUpdate(message)) {
            this.runningContainers.sendToAll(message, completion);
            return result;
        }
        if (this.handleSetInstances(message, completion)) {
            return result;
        }
        if (this.handleRestartRunnablesInstances(message, completion)) {
            return result;
        }
        if (this.handleLogLevelMessages(message, completion)) {
            return result;
        }
        if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
            this.runningContainers.sendToAll(message, completion);
            return result;
        }
        if (message.getScope() == Message.Scope.RUNNABLE) {
            this.runningContainers.sendToRunnable(message.getRunnableName(), message, completion);
            return result;
        }
        LOG.info("Message ignored. {}", (Object)message);
        return Futures.immediateFuture((Object)messageId);
    }

    protected void triggerShutdown() {
        this.stopped = true;
    }

    private void cleanupDir() {
        try {
            if (this.applicationLocation.delete(true)) {
                LOG.info("Application directory deleted: {}", (Object)this.applicationLocation);
            } else {
                LOG.warn("Failed to cleanup directory {}.", (Object)this.applicationLocation);
            }
        }
        catch (Exception e) {
            LOG.warn("Exception while cleanup directory {}.", (Object)this.applicationLocation, (Object)e);
        }
    }

    protected void doRun() throws Exception {
        Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> currentRequest = null;
        final LinkedList provisioning = Lists.newLinkedList();
        YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler(){

            @Override
            public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers) {
                ApplicationMasterService.this.launchRunnable(launchers, provisioning);
            }

            @Override
            public void completed(List<YarnContainerStatus> completed) {
                ApplicationMasterService.this.handleCompleted(completed);
            }
        };
        long requestStartTime = 0L;
        boolean isRequestRelaxed = false;
        long nextTimeoutCheck = System.currentTimeMillis() + 30000L;
        while (!this.stopped) {
            TimeUnit.SECONDS.sleep(1L);
            try {
                this.amClient.allocate(0.0f, allocateHandler);
            }
            catch (Exception e) {
                LOG.warn("Exception raised when making heartbeat to RM. Will be retried in next heartbeat.", (Throwable)e);
            }
            if (provisioning.isEmpty() && this.runnableContainerRequests.isEmpty() && this.runningContainers.isEmpty()) {
                LOG.info("All containers completed. Shutting down application master.");
                this.stopStatus = StopStatus.COMPLETED;
                break;
            }
            if (provisioning.isEmpty() && currentRequest == null && !this.runnableContainerRequests.isEmpty()) {
                RunnableContainerRequest containerRequest = this.runnableContainerRequests.peek();
                if (!containerRequest.isReadyToBeProvisioned()) {
                    LOG.debug("Request not ready: {}", (Object)containerRequest);
                    this.runnableContainerRequests.add(this.runnableContainerRequests.poll());
                    continue;
                }
                currentRequest = containerRequest.takeRequest();
                if (currentRequest == null) {
                    this.runnableContainerRequests.poll();
                    continue;
                }
            }
            if (provisioning.isEmpty() && currentRequest != null) {
                this.manageBlacklist(currentRequest);
                this.addContainerRequests(((AllocationSpecification)currentRequest.getKey()).getResource(), (Collection)currentRequest.getValue(), provisioning, currentRequest.getKey().getType());
                currentRequest = null;
                requestStartTime = System.currentTimeMillis();
                isRequestRelaxed = false;
            }
            if (!provisioning.isEmpty() && !isRequestRelaxed && System.currentTimeMillis() - requestStartTime > 5000L) {
                LOG.info("Relaxing provisioning constraints for request {}", (Object)((ProvisionRequest)provisioning.peek()).getRequestId());
                this.amClient.clearBlacklist();
                isRequestRelaxed = true;
            }
            nextTimeoutCheck = this.checkProvisionTimeout(nextTimeoutCheck);
        }
    }

    private void manageBlacklist(Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> request) {
        this.amClient.clearBlacklist();
        AllocationSpecification allocationSpec = request.getKey();
        if (!allocationSpec.getType().equals((Object)AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
            return;
        }
        String runnableName = allocationSpec.getRunnableName();
        TwillSpecification.PlacementPolicy placementPolicy = this.placementPolicyManager.getPlacementPolicy(runnableName);
        if (placementPolicy == null || placementPolicy.getType() != TwillSpecification.PlacementPolicy.Type.DISTRIBUTED) {
            return;
        }
        for (String runnable : placementPolicy.getNames()) {
            for (ContainerInfo containerInfo : this.runningContainers.getContainerInfo(runnable)) {
                LOG.debug("Adding {} to host blacklist", (Object)containerInfo.getHost().getHostName());
                this.amClient.addToBlacklist(containerInfo.getHost().getHostName());
                this.amClient.addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
            }
        }
    }

    private void handleCompleted(List<YarnContainerStatus> completedContainersStatuses) {
        HashMultiset restartRunnables = HashMultiset.create();
        for (YarnContainerStatus status : completedContainersStatuses) {
            LOG.info("Container {} completed with {}:{}.", new Object[]{status.getContainerId(), status.getState(), status.getDiagnostics()});
            this.runningContainers.handleCompleted(status, (Multiset<String>)restartRunnables);
        }
        for (Multiset.Entry entry : restartRunnables.entrySet()) {
            LOG.info("Re-request container for {} with {} instances.", entry.getElement(), (Object)entry.getCount());
            this.runnableContainerRequests.add(this.createRunnableContainerRequest((String)entry.getElement(), entry.getCount()));
        }
        this.expectedContainers.updateRequestTime(restartRunnables.elementSet());
    }

    private long checkProvisionTimeout(long nextTimeoutCheck) {
        if (System.currentTimeMillis() < nextTimeoutCheck) {
            return nextTimeoutCheck;
        }
        Map<String, ExpectedContainers.ExpectedCount> expiredRequests = this.expectedContainers.getAll();
        Map<String, Integer> runningCounts = this.runningContainers.countAll();
        Map<String, Integer> completedContainerCount = this.runningContainers.getCompletedContainerCount();
        ArrayList timeoutEvents = Lists.newArrayList();
        for (Map.Entry<String, ExpectedContainers.ExpectedCount> entry : expiredRequests.entrySet()) {
            int completedCount;
            String runnableName = entry.getKey();
            ExpectedContainers.ExpectedCount expectedCount = entry.getValue();
            int runningCount = runningCounts.containsKey(runnableName) ? runningCounts.get(runnableName) : 0;
            int n = completedCount = completedContainerCount.containsKey(runnableName) ? completedContainerCount.get(runnableName) : 0;
            if (expectedCount.getCount() <= runningCount + completedCount) continue;
            timeoutEvents.add(new EventHandler.TimeoutEvent(runnableName, expectedCount.getCount(), runningCount, expectedCount.getTimestamp()));
        }
        if (!timeoutEvents.isEmpty()) {
            EventHandler.TimeoutAction action = this.eventHandler.launchTimeout((Iterable)timeoutEvents);
            try {
                if (action.getTimeout() >= 0L) {
                    return nextTimeoutCheck + action.getTimeout();
                }
                this.stopStatus = StopStatus.ABORTED;
                this.stop();
            }
            catch (Throwable t) {
                LOG.warn("Exception when handling TimeoutAction.", t);
            }
        }
        return nextTimeoutCheck + 30000L;
    }

    private Credentials createCredentials() {
        Credentials credentials = new Credentials();
        if (!UserGroupInformation.isSecurityEnabled()) {
            return credentials;
        }
        try {
            credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
            Iterator iter = credentials.getAllTokens().iterator();
            while (iter.hasNext()) {
                Token token = (Token)iter.next();
                if (!token.getKind().equals((Object)AMRM_TOKEN_KIND_NAME)) continue;
                iter.remove();
            }
        }
        catch (IOException e) {
            LOG.warn("Failed to get current user. No credentials will be provided to containers.", (Throwable)e);
        }
        return credentials;
    }

    private Queue<RunnableContainerRequest> initContainerRequests() {
        ConcurrentLinkedQueue<RunnableContainerRequest> requests = new ConcurrentLinkedQueue<RunnableContainerRequest>();
        for (TwillSpecification.Order order : this.twillSpec.getOrders()) {
            Resource capability;
            RuntimeSpecification runtimeSpec;
            Sets.SetView distributedRunnables = Sets.intersection(this.placementPolicyManager.getDistributedRunnables(), (Set)order.getNames());
            Sets.SetView defaultRunnables = Sets.difference((Set)order.getNames(), (Set)distributedRunnables);
            HashMap requestsMap = Maps.newHashMap();
            for (String runnableName : distributedRunnables) {
                runtimeSpec = (RuntimeSpecification)this.twillSpec.getRunnables().get(runnableName);
                capability = this.createCapability(runtimeSpec.getResourceSpecification());
                for (int instanceId = 0; instanceId < runtimeSpec.getResourceSpecification().getInstances(); ++instanceId) {
                    AllocationSpecification allocationSpecification = new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME, runnableName, instanceId);
                    this.addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
                }
            }
            for (String runnableName : defaultRunnables) {
                runtimeSpec = (RuntimeSpecification)this.twillSpec.getRunnables().get(runnableName);
                capability = this.createCapability(runtimeSpec.getResourceSpecification());
                AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
                this.addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
            }
            requests.add(new RunnableContainerRequest(order.getType(), requestsMap));
        }
        return requests;
    }

    private void addAllocationSpecification(AllocationSpecification allocationSpecification, Map<AllocationSpecification, Collection<RuntimeSpecification>> map, RuntimeSpecification runtimeSpec) {
        if (!map.containsKey(allocationSpecification)) {
            map.put(allocationSpecification, Lists.newLinkedList());
        }
        map.get(allocationSpecification).add(runtimeSpec);
    }

    private void addContainerRequests(Resource capability, Collection<RuntimeSpecification> runtimeSpecs, Queue<ProvisionRequest> provisioning, AllocationSpecification.Type allocationType) {
        for (RuntimeSpecification runtimeSpec : runtimeSpecs) {
            String name = runtimeSpec.getName();
            int newContainers = this.expectedContainers.getExpected(name) - this.runningContainers.count(name);
            if (newContainers <= 0) continue;
            if (allocationType.equals((Object)AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
                newContainers = 1;
            }
            LOG.info("Request {} containers with capability {} for runnable {}", new Object[]{newContainers, capability, name});
            YarnAMClient.ContainerRequestBuilder builder = this.amClient.addContainerRequest(capability, newContainers);
            builder.setPriority(0);
            TwillSpecification.PlacementPolicy placementPolicy = this.placementPolicyManager.getPlacementPolicy(name);
            if (placementPolicy != null) {
                builder.addHosts(placementPolicy.getHosts()).addRacks(placementPolicy.getRacks());
            }
            String requestId = builder.apply();
            provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers, allocationType));
        }
    }

    private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers, Queue<ProvisionRequest> provisioning) {
        for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
            LOG.info("Container allocated: {}", ((YarnContainerInfo)processLauncher.getContainerInfo()).getContainer());
            ProvisionRequest provisionRequest = provisioning.peek();
            if (provisionRequest == null) continue;
            String runnableName = provisionRequest.getRuntimeSpec().getName();
            LOG.info("Starting runnable {} in {}", (Object)runnableName, ((YarnContainerInfo)processLauncher.getContainerInfo()).getContainer());
            int containerCount = this.expectedContainers.getExpected(runnableName);
            LinkedHashMap<String, String> env = new LinkedHashMap<String, String>();
            if (this.environments.containsKey(runnableName)) {
                env.putAll(this.environments.get(runnableName));
            }
            ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, this.amLiveNode.getLocalFiles(), (Object)this.credentials);
            TwillContainerLauncher launcher = new TwillContainerLauncher((RuntimeSpecification)this.twillSpec.getRunnables().get(runnableName), (ContainerInfo)processLauncher.getContainerInfo(), launchContext, ZKClients.namespace((ZKClient)this.zkClient, (String)this.getZKNamespace(runnableName)), containerCount, this.jvmOpts, this.twillRuntimeSpec.getReservedMemory(runnableName), this.twillRuntimeSpec.getMinHeapRatio(runnableName), this.getSecureStoreLocation());
            this.runningContainers.start(runnableName, (ContainerInfo)processLauncher.getContainerInfo(), launcher);
            if (provisionRequest.containerAcquired()) {
                this.amClient.completeContainerRequest(provisionRequest.getRequestId());
            }
            if (this.expectedContainers.getExpected(runnableName) == this.runningContainers.count(runnableName) || provisioning.peek().getType().equals((Object)AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
                provisioning.poll();
            }
            if (this.expectedContainers.getExpected(runnableName) != this.runningContainers.count(runnableName)) continue;
            LOG.info("Runnable {} fully provisioned with {} instances.", (Object)runnableName, (Object)containerCount);
        }
    }

    private List<LocalFile> getLocalizeFiles() throws IOException {
        try (BufferedReader reader = Files.newBufferedReader(Paths.get("localizeFiles.json", new String[0]), StandardCharsets.UTF_8);){
            List list = (List)new GsonBuilder().registerTypeAdapter(LocalFile.class, (Object)new LocalFileCodec()).create().fromJson((Reader)reader, new TypeToken<List<LocalFile>>(){}.getType());
            return list;
        }
    }

    private Map<String, Map<String, String>> getEnvironments() throws IOException {
        Path envFile = Paths.get("runtime.config.jar", "environments.json");
        if (!Files.exists(envFile, new LinkOption[0])) {
            return new HashMap<String, Map<String, String>>();
        }
        try (BufferedReader reader = Files.newBufferedReader(envFile, StandardCharsets.UTF_8);){
            Map map = (Map)new Gson().fromJson((Reader)reader, new TypeToken<Map<String, Map<String, String>>>(){}.getType());
            return map;
        }
    }

    private String getZKNamespace(String runnableName) {
        return String.format("/%s/runnables/%s", this.runId.getId(), runnableName);
    }

    private boolean handleSetInstances(Message message, Runnable completion) {
        if (message.getType() != Message.Type.SYSTEM || message.getScope() != Message.Scope.RUNNABLE) {
            return false;
        }
        Command command = message.getCommand();
        Map options = command.getOptions();
        if (!"instances".equals(command.getCommand()) || !options.containsKey("count")) {
            return false;
        }
        String runnableName = message.getRunnableName();
        if (runnableName == null || runnableName.isEmpty() || !this.twillSpec.getRunnables().containsKey(runnableName)) {
            LOG.info("Unknown runnable {}", (Object)runnableName);
            return false;
        }
        int newCount = Integer.parseInt((String)options.get("count"));
        int oldCount = this.expectedContainers.getExpected(runnableName);
        LOG.info("Received change instances request for {}, from {} to {}.", new Object[]{runnableName, oldCount, newCount});
        if (newCount == oldCount) {
            completion.run();
            return true;
        }
        this.instanceChangeExecutor.execute(this.createSetInstanceRunnable(message, completion, oldCount, newCount));
        return true;
    }

    private Runnable createSetInstanceRunnable(final Message message, final Runnable completion, final int oldCount, final int newCount) {
        return new Runnable(){

            @Override
            public void run() {
                String runnableName = message.getRunnableName();
                LOG.info("Processing change instance request for {}, from {} to {}.", new Object[]{runnableName, oldCount, newCount});
                try {
                    ApplicationMasterService.this.runningContainers.waitForCount(runnableName, oldCount);
                    LOG.info("Confirmed {} containers running for {}.", (Object)oldCount, (Object)runnableName);
                    ApplicationMasterService.this.expectedContainers.setExpected(runnableName, newCount);
                    try {
                        if (newCount < oldCount) {
                            for (int i = 0; i < oldCount - newCount; ++i) {
                                ApplicationMasterService.this.runningContainers.stopLastAndWait(runnableName);
                            }
                        } else {
                            ApplicationMasterService.this.runnableContainerRequests.add(ApplicationMasterService.this.createRunnableContainerRequest(runnableName, newCount - oldCount));
                        }
                    }
                    finally {
                        ApplicationMasterService.this.runningContainers.sendToRunnable(runnableName, message, completion);
                        LOG.info("Change instances request completed. From {} to {}.", (Object)oldCount, (Object)newCount);
                    }
                }
                catch (InterruptedException e) {
                    completion.run();
                }
            }
        };
    }

    private RunnableContainerRequest createRunnableContainerRequest(String runnableName, int numberOfInstances) {
        return this.createRunnableContainerRequest(runnableName, numberOfInstances, true);
    }

    private RunnableContainerRequest createRunnableContainerRequest(final String runnableName, int numberOfInstances, boolean isProvisioned) {
        TwillSpecification.Order order = (TwillSpecification.Order)Iterables.find((Iterable)this.twillSpec.getOrders(), (Predicate)new Predicate<TwillSpecification.Order>(){

            public boolean apply(TwillSpecification.Order input) {
                return input.getNames().contains(runnableName);
            }
        });
        RuntimeSpecification runtimeSpec = (RuntimeSpecification)this.twillSpec.getRunnables().get(runnableName);
        Resource capability = this.createCapability(runtimeSpec.getResourceSpecification());
        HashMap requestsMap = Maps.newHashMap();
        if (this.placementPolicyManager.getDistributedRunnables().contains(runnableName)) {
            for (int instanceId = 0; instanceId < numberOfInstances; ++instanceId) {
                AllocationSpecification allocationSpecification = new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME, runnableName, instanceId);
                this.addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
            }
        } else {
            AllocationSpecification allocationSpecification = numberOfInstances > 1 ? new AllocationSpecification(capability) : new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME, runnableName, 0);
            this.addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
        }
        return new RunnableContainerRequest(order.getType(), requestsMap, isProvisioned);
    }

    private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {
        return new Runnable(){

            @Override
            public void run() {
                future.set((Object)messageId);
            }
        };
    }

    private Resource createCapability(ResourceSpecification resourceSpec) {
        Resource capability = (Resource)Records.newRecord(Resource.class);
        if (!YarnUtils.setVirtualCores(capability, resourceSpec.getVirtualCores())) {
            LOG.debug("Virtual cores limit not supported.");
        }
        capability.setMemory(resourceSpec.getMemorySize());
        return capability;
    }

    private boolean handleRestartRunnablesInstances(Message message, Runnable completion) {
        LOG.debug("Check if it should process a restart runnable instances.");
        if (message.getType() != Message.Type.SYSTEM) {
            return false;
        }
        Message.Scope messageScope = message.getScope();
        if (messageScope != Message.Scope.RUNNABLE && messageScope != Message.Scope.RUNNABLES) {
            return false;
        }
        Command requestCommand = message.getCommand();
        if (!"restartAllRunnableInstances".equals(requestCommand.getCommand()) && !"restartRunnablesInstances".equals(requestCommand.getCommand())) {
            return false;
        }
        LOG.debug("Processing restart runnable instances message {}.", (Object)message);
        if (!Strings.isNullOrEmpty((String)message.getRunnableName()) && message.getScope() == Message.Scope.RUNNABLE) {
            String runnableName = message.getRunnableName();
            LOG.debug("Start restarting all runnable {} instances.", (Object)runnableName);
            this.restartRunnableInstances(runnableName, null, completion);
        } else {
            for (Map.Entry option : requestCommand.getOptions().entrySet()) {
                String runnableName = (String)option.getKey();
                Set restartedInstanceIds = (Set)GSON.fromJson((String)option.getValue(), new TypeToken<Set<Integer>>(){}.getType());
                LOG.debug("Start restarting runnable {} instances {}", (Object)runnableName, (Object)restartedInstanceIds);
                this.restartRunnableInstances(runnableName, restartedInstanceIds, completion);
            }
        }
        return true;
    }

    private void restartRunnableInstances(final String runnableName, final @Nullable Set<Integer> instanceIds, final Runnable completion) {
        this.instanceChangeExecutor.execute(new Runnable(){

            @Override
            public void run() {
                ImmutableSet instancesToRemove;
                LOG.debug("Begin restart runnable {} instances.", (Object)runnableName);
                int runningCount = ApplicationMasterService.this.runningContainers.count(runnableName);
                ImmutableSet immutableSet = instancesToRemove = instanceIds == null ? null : ImmutableSet.copyOf((Collection)instanceIds);
                if (instancesToRemove == null) {
                    instancesToRemove = Ranges.closedOpen((Comparable)Integer.valueOf(0), (Comparable)Integer.valueOf(runningCount)).asSet(DiscreteDomains.integers());
                }
                LOG.info("Restarting instances {} for runnable {}", (Object)instancesToRemove, (Object)runnableName);
                RunnableContainerRequest containerRequest = ApplicationMasterService.this.createRunnableContainerRequest(runnableName, instancesToRemove.size(), false);
                ApplicationMasterService.this.runnableContainerRequests.add(containerRequest);
                Iterator iterator = instancesToRemove.iterator();
                while (iterator.hasNext()) {
                    int instanceId = (Integer)iterator.next();
                    LOG.debug("Stop instance {} for runnable {}", (Object)instanceId, (Object)runnableName);
                    try {
                        ApplicationMasterService.this.runningContainers.stopByIdAndWait(runnableName, instanceId);
                    }
                    catch (Exception ex) {
                        LOG.info("Exception thrown when stopping instance {} probably already stopped.", (Object)instanceId);
                    }
                }
                LOG.info("All instances in {} for runnable {} are stopped. Ready to provision", (Object)instancesToRemove, (Object)runnableName);
                containerRequest.setReadyToBeProvisioned();
                ApplicationMasterService.this.expectedContainers.updateRequestTime(Collections.singleton(runnableName));
                completion.run();
            }
        });
    }

    private boolean handleLogLevelMessages(Message message, Runnable completion) {
        Message.Scope scope = message.getScope();
        if (message.getType() != Message.Type.SYSTEM || scope != Message.Scope.RUNNABLE && scope != Message.Scope.ALL_RUNNABLE) {
            return false;
        }
        String command = message.getCommand().getCommand();
        if (!command.equals("setLogLevels") && !command.equals("resetLogLevels")) {
            return false;
        }
        if (scope == Message.Scope.ALL_RUNNABLE) {
            this.runningContainers.sendToAll(message, completion);
        } else {
            String runnableName = message.getRunnableName();
            if (runnableName == null || !this.twillSpec.getRunnables().containsKey(runnableName)) {
                LOG.info("Unknown runnable {}", (Object)runnableName);
                return false;
            }
            this.runningContainers.sendToRunnable(runnableName, message, completion);
        }
        return true;
    }

    private static enum StopStatus {
        COMPLETED,
        ABORTED;

    }
}

