/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiNodeSorter<N extends SchedulerNode>
extends AbstractService {
    private MultiNodeLookupPolicy<N> multiNodePolicy;
    private static final Logger LOG = LoggerFactory.getLogger(MultiNodeSorter.class);
    private ScheduledExecutorService ses;
    private ScheduledFuture<?> handler;
    private volatile boolean stopped;
    private RMContext rmContext;
    private MultiNodePolicySpec policySpec;

    public MultiNodeSorter(RMContext rmContext, MultiNodePolicySpec policy) {
        super("MultiNodeLookupPolicy");
        this.rmContext = rmContext;
        this.policySpec = policy;
    }

    @VisibleForTesting
    public synchronized MultiNodeLookupPolicy<N> getMultiNodeLookupPolicy() {
        return this.multiNodePolicy;
    }

    public void serviceInit(Configuration conf) throws Exception {
        LOG.info("Initializing MultiNodeSorter=" + this.policySpec.getPolicyName() + ", with sorting interval=" + this.policySpec.getSortingInterval());
        this.initPolicy(this.policySpec.getPolicyName());
        super.serviceInit(conf);
    }

    void initPolicy(String policyName) throws YarnException {
        Class<?> policyClass;
        try {
            policyClass = Class.forName(policyName);
        }
        catch (ClassNotFoundException e) {
            throw new YarnException("Invalid policy name:" + policyName + e.getMessage());
        }
        this.multiNodePolicy = (MultiNodeLookupPolicy)ReflectionUtils.newInstance(policyClass, null);
    }

    public void serviceStart() throws Exception {
        LOG.info("Starting SchedulingMonitor=" + this.getName());
        assert (!this.stopped) : "starting when already stopped";
        this.ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(MultiNodeSorter.this.getName());
                return t;
            }
        });
        if (this.policySpec.getSortingInterval() != 0L) {
            this.handler = this.ses.scheduleAtFixedRate(new SortingThread(), 0L, this.policySpec.getSortingInterval(), TimeUnit.MILLISECONDS);
        }
        super.serviceStart();
    }

    public void serviceStop() throws Exception {
        this.stopped = true;
        if (this.handler != null) {
            LOG.info("Stop " + this.getName());
            this.handler.cancel(true);
            this.ses.shutdown();
        }
        super.serviceStop();
    }

    @VisibleForTesting
    public void reSortClusterNodes() {
        HashSet<String> nodeLabels = new HashSet<String>();
        nodeLabels.addAll(this.rmContext.getNodeLabelManager().getClusterNodeLabelNames());
        nodeLabels.add("");
        for (String label : nodeLabels) {
            HashMap nodesByPartition = new HashMap();
            List nodes = ((AbstractYarnScheduler)this.rmContext.getScheduler()).getNodeTracker().getNodesPerPartition(label);
            if (nodes == null || nodes.isEmpty()) continue;
            nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
            this.multiNodePolicy.addAndRefreshNodesSet(nodesByPartition.values(), label);
        }
    }

    public boolean isSorterThreadRunning() {
        return this.handler != null;
    }

    private class SortingThread
    implements Runnable {
        private SortingThread() {
        }

        @Override
        public void run() {
            try {
                MultiNodeSorter.this.reSortClusterNodes();
            }
            catch (Throwable t) {
                LOG.error("Exception raised while executing multinode sorter, skip this run..., exception=", t);
            }
        }
    }
}

