/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.internal.appmaster;

import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.google.common.collect.Table;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.twill.api.EventHandler;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.RunId;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.ContainerInfo;
import org.apache.twill.internal.ContainerLiveNodeData;
import org.apache.twill.internal.DefaultResourceReport;
import org.apache.twill.internal.DefaultTwillRunResources;
import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.TwillContainerController;
import org.apache.twill.internal.TwillContainerLauncher;
import org.apache.twill.internal.TwillRuntimeSpecification;
import org.apache.twill.internal.container.TwillContainerMain;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.yarn.YarnContainerStatus;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class RunningContainers {
    private static final Logger LOG = LoggerFactory.getLogger(RunningContainers.class);
    private static final Function<BitSet, Integer> BITSET_CARDINALITY = new Function<BitSet, Integer>(){

        public Integer apply(BitSet input) {
            return input.cardinality();
        }
    };
    private final Table<String, String, TwillContainerController> containers = HashBasedTable.create();
    private final Map<String, BitSet> runnableInstances = Maps.newHashMap();
    private final Map<String, Integer> completedContainerCount = Maps.newHashMap();
    private final DefaultResourceReport resourceReport;
    private final Deque<String> startSequence = Lists.newLinkedList();
    private final Lock containerLock = new ReentrantLock();
    private final Condition containerChange = this.containerLock.newCondition();
    private final ZKClient zkClient;
    private final Multimap<String, ContainerInfo> containerStats;
    private final Location applicationLocation;
    private final Set<String> runnableNames;
    private final Map<String, Map<String, String>> logLevels;
    private final Map<String, Integer> maxRetries;
    private final Map<String, Map<Integer, AtomicInteger>> numRetries;
    private final EventHandler eventHandler;

    RunningContainers(TwillRuntimeSpecification twillRuntimeSpec, String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient, Location applicationLocation, Map<String, RuntimeSpecification> runnables, EventHandler eventHandler) {
        this.resourceReport = new DefaultResourceReport(appId, appMasterResources);
        this.zkClient = zookeeperClient;
        this.containerStats = HashMultimap.create();
        this.applicationLocation = applicationLocation;
        this.runnableNames = runnables.keySet();
        this.logLevels = new TreeMap<String, Map<String, String>>();
        this.maxRetries = Maps.newHashMap((Map)twillRuntimeSpec.getMaxRetries());
        this.numRetries = Maps.newHashMap();
        this.eventHandler = eventHandler;
    }

    boolean isEmpty() {
        this.containerLock.lock();
        try {
            boolean bl = this.runnableInstances.isEmpty();
            return bl;
        }
        finally {
            this.containerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher) {
        this.containerLock.lock();
        try {
            int instanceId = this.getStartInstanceId(runnableName);
            RunId runId = this.getRunId(runnableName, instanceId);
            TwillContainerController controller = launcher.start(runId, instanceId, TwillContainerMain.class, "$HADOOP_CONF_DIR", this.saveLogLevels());
            this.containers.put((Object)runnableName, (Object)containerInfo.getId(), (Object)controller);
            DynamicTwillRunResources resources = new DynamicTwillRunResources(instanceId, containerInfo.getId(), containerInfo.getVirtualCores(), containerInfo.getMemoryMB(), launcher.getMaxHeapMemoryMB(), containerInfo.getHost().getHostName(), controller);
            this.resourceReport.addRunResources(runnableName, (TwillRunResources)resources);
            this.containerStats.put((Object)runnableName, (Object)containerInfo);
            if (this.startSequence.isEmpty() || !runnableName.equals(this.startSequence.peekLast())) {
                this.startSequence.addLast(runnableName);
            }
            this.containerChange.signalAll();
            this.eventHandler.containerLaunched(runnableName, instanceId, containerInfo.getId());
        }
        finally {
            this.containerLock.unlock();
        }
    }

    void addWatcher(String path) {
        ZKOperations.watchChildren((ZKClient)this.zkClient, (String)path, (ZKOperations.ChildrenCallback)new ZKOperations.ChildrenCallback(){

            public void updated(NodeChildren nodeChildren) {
                RunningContainers.this.resourceReport.setServices(nodeChildren.getChildren());
            }
        });
    }

    ResourceReport getResourceReport() {
        return this.resourceReport;
    }

    Collection<ContainerInfo> getContainerInfo(String runnableName) {
        return this.containerStats.get((Object)runnableName);
    }

    void stopLastAndWait(String runnableName) {
        int maxInstanceId;
        this.containerLock.lock();
        try {
            maxInstanceId = this.getMaxInstanceId(runnableName);
            if (maxInstanceId < 0) {
                LOG.warn("No running container found for {}", (Object)runnableName);
                return;
            }
        }
        finally {
            this.containerLock.unlock();
        }
        this.stopByIdAndWait(runnableName, maxInstanceId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stopByIdAndWait(String runnableName, int instanceId) {
        String containerId = null;
        TwillContainerController controller = null;
        this.containerLock.lock();
        try {
            for (Map.Entry entry : this.containers.row((Object)runnableName).entrySet()) {
                if (((TwillContainerController)entry.getValue()).getInstanceId() != instanceId) continue;
                containerId = (String)entry.getKey();
                controller = (TwillContainerController)entry.getValue();
                break;
            }
            Preconditions.checkState((containerId != null ? 1 : 0) != 0, (String)"No container found for {} with instanceId = {}", (Object[])new Object[]{runnableName, instanceId});
            Preconditions.checkState((controller != null ? 1 : 0) != 0, (String)"Null controller found for {} with instanceId = {}", (Object[])new Object[]{runnableName, instanceId});
        }
        finally {
            this.containerLock.unlock();
        }
        LOG.info("Stopping service: {} {}", (Object)runnableName, (Object)controller.getRunId());
        Futures.getUnchecked((Future)controller.terminate());
        this.containerLock.lock();
        try {
            if (this.removeContainerInfo(containerId)) {
                this.containers.remove((Object)runnableName, (Object)containerId);
                this.removeInstanceId(runnableName, instanceId);
                this.numRetries.get(runnableName).remove(instanceId);
                this.resourceReport.removeRunnableResources(runnableName, containerId);
                this.containerChange.signalAll();
                this.eventHandler.containerStopped(runnableName, instanceId, containerId, -100);
            }
        }
        finally {
            this.containerLock.unlock();
        }
    }

    void waitForCount(String runnableName, int count) throws InterruptedException {
        this.containerLock.lock();
        try {
            while (this.getRunningInstances(runnableName) != count) {
                this.containerChange.await();
            }
        }
        finally {
            this.containerLock.unlock();
        }
    }

    int count(String runnableName) {
        this.containerLock.lock();
        try {
            int n = this.getRunningInstances(runnableName);
            return n;
        }
        finally {
            this.containerLock.unlock();
        }
    }

    Map<String, Integer> countAll() {
        this.containerLock.lock();
        try {
            ImmutableMap immutableMap = ImmutableMap.copyOf((Map)Maps.transformValues(this.runnableInstances, BITSET_CARDINALITY));
            return immutableMap;
        }
        finally {
            this.containerLock.unlock();
        }
    }

    Map<String, Integer> getCompletedContainerCount() {
        this.containerLock.lock();
        try {
            ImmutableMap immutableMap = ImmutableMap.copyOf(this.completedContainerCount);
            return immutableMap;
        }
        finally {
            this.containerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendToAll(Message message, Runnable completion) {
        this.containerLock.lock();
        try {
            for (String runnableName : this.runnableNames) {
                this.checkAndUpdateLogLevels(message, runnableName);
            }
            if (this.containers.isEmpty()) {
                completion.run();
            }
            AtomicInteger count = new AtomicInteger(this.containers.size());
            for (Map.Entry entry : this.containers.rowMap().entrySet()) {
                for (TwillContainerController controller : ((Map)entry.getValue()).values()) {
                    this.sendMessage((String)entry.getKey(), message, controller, count, completion);
                }
            }
        }
        finally {
            this.containerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendToRunnable(String runnableName, Message message, Runnable completion) {
        ArrayList controllers;
        this.containerLock.lock();
        try {
            this.checkAndUpdateLogLevels(message, runnableName);
            controllers = new ArrayList(this.containers.row((Object)runnableName).values());
        }
        finally {
            this.containerLock.unlock();
        }
        if (controllers.isEmpty()) {
            completion.run();
        }
        AtomicInteger count = new AtomicInteger(controllers.size());
        for (TwillContainerController controller : controllers) {
            this.sendMessage(runnableName, message, controller, count, completion);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stopAll(long terminationTimeoutMillis) {
        this.containerLock.lock();
        LinkedList reverseRunnables = new LinkedList();
        try {
            Iterators.addAll(reverseRunnables, this.startSequence.descendingIterator());
        }
        finally {
            this.containerLock.unlock();
        }
        for (String string : reverseRunnables) {
            LOG.info("Stopping all instances of " + string);
            ArrayList<Future> futures = new ArrayList<Future>();
            this.containerLock.lock();
            try {
                for (TwillContainerController controller : this.containers.row((Object)string).values()) {
                    futures.add(controller.terminate(terminationTimeoutMillis, TimeUnit.MILLISECONDS));
                }
            }
            finally {
                this.containerLock.unlock();
            }
            for (Future future : futures) {
                try {
                    future.get(terminationTimeoutMillis, TimeUnit.MILLISECONDS);
                }
                catch (Exception e) {
                    LOG.warn("Exception raised when terminating a container for runnable {}", (Object)string, (Object)e);
                }
            }
            LOG.info("Terminated all instances of " + string);
        }
        this.containerLock.lock();
        try {
            for (Map.Entry entry : this.containers.rowMap().entrySet()) {
                String runnableName = (String)entry.getKey();
                Collection containerInfos = this.containerStats.get((Object)runnableName);
                block15: for (Map.Entry containerControllerEntry : ((Map)entry.getValue()).entrySet()) {
                    for (ContainerInfo containerInfo : containerInfos) {
                        if (!containerInfo.getId().equals(containerControllerEntry.getKey())) continue;
                        this.eventHandler.containerStopped(runnableName, ((TwillContainerController)containerControllerEntry.getValue()).getInstanceId(), (String)containerControllerEntry.getKey(), -100);
                        continue block15;
                    }
                }
            }
            this.containers.clear();
            this.runnableInstances.clear();
            this.numRetries.clear();
            this.containerStats.clear();
        }
        finally {
            this.containerLock.unlock();
        }
    }

    Set<String> getContainerIds() {
        this.containerLock.lock();
        try {
            ImmutableSet immutableSet = ImmutableSet.copyOf((Collection)this.containers.columnKeySet());
            return immutableSet;
        }
        finally {
            this.containerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleCompleted(YarnContainerStatus status, Multiset<String> restartRunnables) {
        this.containerLock.lock();
        String containerId = status.getContainerId();
        int exitStatus = status.getExitStatus();
        ContainerState state = status.getState();
        try {
            this.removeContainerInfo(containerId);
            Map lookup = this.containers.column((Object)containerId);
            if (lookup.isEmpty()) {
                return;
            }
            if (lookup.size() != 1) {
                LOG.warn("More than one controller found for container {}", (Object)containerId);
            }
            boolean containerStopped = false;
            String runnableName = (String)lookup.keySet().iterator().next();
            int instanceId = 0;
            for (Map.Entry completedEntry : lookup.entrySet()) {
                TwillContainerController controller = (TwillContainerController)completedEntry.getValue();
                instanceId = controller.getInstanceId();
                containerStopped = containerStopped || this.isControllerStopped(controller);
                controller.completed(exitStatus);
                if (exitStatus == 0) {
                    if (!this.completedContainerCount.containsKey(runnableName)) {
                        this.completedContainerCount.put(runnableName, 0);
                    }
                    this.completedContainerCount.put(runnableName, this.completedContainerCount.get(runnableName) + 1);
                }
                this.removeInstanceId(runnableName, controller.getInstanceId());
                this.resourceReport.removeRunnableResources(runnableName, containerId);
                this.eventHandler.containerStopped(runnableName, instanceId, containerId, exitStatus);
            }
            if (exitStatus != 0) {
                LOG.warn("Container {} exited abnormally with state {}, exit code {}.", new Object[]{containerId, state, exitStatus});
                if (!containerStopped && this.shouldRetry(runnableName, instanceId, exitStatus)) {
                    LOG.info("Re-request the container {} for exit code {}.", (Object)containerId, (Object)exitStatus);
                    restartRunnables.add((Object)runnableName);
                } else if (containerStopped) {
                    LOG.info("Container {} is being stopped, will not re-request", (Object)containerId);
                }
            } else {
                LOG.info("Container {} exited normally with state {}", (Object)containerId, (Object)state);
            }
            lookup.clear();
            this.containerChange.signalAll();
        }
        finally {
            this.containerLock.unlock();
        }
    }

    private boolean shouldRetry(String runnableName, int instanceId, int exitCode) {
        boolean possiblyRetry;
        boolean bl = possiblyRetry = exitCode != 0 && exitCode != -101 && exitCode != 10;
        if (possiblyRetry) {
            int max = this.getMaxRetries(runnableName);
            if (max == Integer.MAX_VALUE) {
                return true;
            }
            int retryCount = this.getRetryCount(runnableName, instanceId);
            if (retryCount == max) {
                LOG.info("Retries exhausted for instance {} of runnable {}.", (Object)instanceId, (Object)runnableName);
                return false;
            }
            LOG.info("Attempting {} of {} retries for instance {} of runnable {}.", new Object[]{retryCount + 1, max, instanceId, runnableName});
            return true;
        }
        return false;
    }

    private boolean isControllerStopped(TwillContainerController controller) {
        return controller.state() == Service.State.STOPPING || controller.state() == Service.State.TERMINATED;
    }

    private void sendMessage(final String runnableName, final Message message, final TwillContainerController controller, final AtomicInteger count, final Runnable completion) {
        Futures.addCallback((ListenableFuture)controller.sendMessage(message), (FutureCallback)new FutureCallback<Message>(){

            public void onSuccess(Message result) {
                if (count.decrementAndGet() == 0) {
                    completion.run();
                }
            }

            public void onFailure(Throwable t) {
                try {
                    LOG.error("Failed to send message. Runnable: {}, RunId: {}, Message: {}.", new Object[]{runnableName, controller.getRunId(), message, t});
                }
                finally {
                    if (count.decrementAndGet() == 0) {
                        completion.run();
                    }
                }
            }
        });
    }

    private int getStartInstanceId(String runnableName) {
        BitSet instances = this.runnableInstances.get(runnableName);
        if (instances == null) {
            instances = new BitSet();
            this.runnableInstances.put(runnableName, instances);
        }
        int instanceId = 0;
        int maxRetries = this.getMaxRetries(runnableName);
        while (this.getRetryCount(runnableName, instanceId = instances.nextClearBit(instanceId)) == maxRetries) {
            ++instanceId;
        }
        this.incrementRetryCount(runnableName, instanceId);
        instances.set(instanceId);
        return instanceId;
    }

    private void removeInstanceId(String runnableName, int instanceId) {
        BitSet instances = this.runnableInstances.get(runnableName);
        if (instances == null) {
            return;
        }
        instances.clear(instanceId);
        if (instances.isEmpty()) {
            this.runnableInstances.remove(runnableName);
        }
    }

    private int getMaxInstanceId(String runnableName) {
        BitSet instances = this.runnableInstances.get(runnableName);
        if (instances == null || instances.isEmpty()) {
            return -1;
        }
        return instances.length() - 1;
    }

    private int getRunningInstances(String runableName) {
        BitSet instances = this.runnableInstances.get(runableName);
        return instances == null ? 0 : instances.cardinality();
    }

    private int getMaxRetries(String runnableName) {
        int max = Integer.MAX_VALUE;
        if (this.maxRetries.containsKey(runnableName)) {
            max = this.maxRetries.get(runnableName);
        }
        return max;
    }

    private int getRetryCount(String runnableName, int instanceId) {
        AtomicInteger cnt;
        HashMap runnableNumRetries = this.numRetries.get(runnableName);
        if (runnableNumRetries == null) {
            runnableNumRetries = Maps.newHashMap();
            this.numRetries.put(runnableName, runnableNumRetries);
        }
        if ((cnt = (AtomicInteger)runnableNumRetries.get(instanceId)) == null) {
            cnt = new AtomicInteger(-1);
            runnableNumRetries.put(instanceId, cnt);
        }
        return cnt.get();
    }

    private void incrementRetryCount(String runnableName, int instanceId) {
        this.numRetries.get(runnableName).get(instanceId).incrementAndGet();
    }

    private RunId getRunId(String runnableName, int instanceId) {
        RunId baseId;
        Collection controllers = this.containers.row((Object)runnableName).values();
        if (controllers.isEmpty()) {
            baseId = RunIds.generate();
        } else {
            String id = ((TwillContainerController)controllers.iterator().next()).getRunId().getId();
            baseId = RunIds.fromString((String)id.substring(0, id.lastIndexOf(45)));
        }
        return RunIds.fromString((String)(baseId.getId() + '-' + instanceId));
    }

    private boolean removeContainerInfo(String containerId) {
        for (ContainerInfo containerInfo : this.containerStats.values()) {
            if (!containerInfo.getId().equals(containerId)) continue;
            this.containerStats.values().remove(containerInfo);
            return true;
        }
        return false;
    }

    private void checkAndUpdateLogLevels(Message message, String runnableName) {
        String command = message.getCommand().getCommand();
        if (message.getType() != Message.Type.SYSTEM || !"setLogLevels".equals(command) && !"resetLogLevels".equals(command)) {
            return;
        }
        Map messageOptions = message.getCommand().getOptions();
        Map<String, String> runnableLogLevels = this.logLevels.get(runnableName);
        if ("resetLogLevels".equals(command)) {
            if (runnableLogLevels != null) {
                if (messageOptions.isEmpty()) {
                    this.logLevels.remove(runnableName);
                } else {
                    runnableLogLevels.keySet().removeAll(messageOptions.keySet());
                }
            }
            return;
        }
        if (runnableLogLevels == null) {
            runnableLogLevels = new TreeMap<String, String>();
            this.logLevels.put(runnableName, runnableLogLevels);
        }
        runnableLogLevels.putAll(messageOptions);
    }

    private Location saveLogLevels() {
        LOG.debug("save the log level file");
        try {
            Gson gson = new GsonBuilder().serializeNulls().create();
            String jsonStr = gson.toJson(this.logLevels);
            String fileName = Hashing.md5().hashString((CharSequence)jsonStr) + "." + "logLevel.json";
            Location location = this.applicationLocation.append(fileName);
            if (!location.exists()) {
                try (OutputStreamWriter writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);){
                    writer.write(jsonStr);
                }
            }
            LOG.debug("Done saving the log level file");
            return location;
        }
        catch (IOException e) {
            LOG.error("Failed to save the log level file.");
            return null;
        }
    }

    private static final class DynamicTwillRunResources
    extends DefaultTwillRunResources {
        private static final Function<String, LogEntry.Level> LOG_LEVEL_CONVERTER = new Function<String, LogEntry.Level>(){

            @Nullable
            public LogEntry.Level apply(@Nullable String logLevel) {
                return logLevel == null ? null : LogEntry.Level.valueOf((String)logLevel);
            }
        };
        private final TwillContainerController controller;
        private Integer dynamicDebugPort = null;

        private DynamicTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB, String host, TwillContainerController controller) {
            super(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, null);
            this.controller = controller;
        }

        public synchronized Integer getDebugPort() {
            ContainerLiveNodeData liveData;
            if (this.dynamicDebugPort == null && (liveData = this.controller.getLiveNodeData()) != null && liveData.getDebugPort() != null) {
                try {
                    this.dynamicDebugPort = Integer.parseInt(liveData.getDebugPort());
                }
                catch (NumberFormatException e) {
                    LOG.warn("Live data for {} has debug port of '{}' which cannot be parsed as a number", (Object)this.getContainerId(), (Object)liveData.getDebugPort());
                }
            }
            return this.dynamicDebugPort;
        }

        public synchronized Map<String, LogEntry.Level> getLogLevels() {
            ContainerLiveNodeData liveData = this.controller.getLiveNodeData();
            if (liveData != null) {
                return Maps.transformValues((Map)liveData.getLogLevels(), LOG_LEVEL_CONVERTER);
            }
            return Collections.emptyMap();
        }
    }
}

