/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestFairSchedulerPreemption
extends FairSchedulerTestBase {
    private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
    private static final int GB = 1024;
    private final ControlledClock clock = new ControlledClock();
    private static final int NODE_CAPACITY_MULTIPLE = 4;
    private final boolean fairsharePreemption;
    private final boolean drf;
    private FSAppAttempt greedyApp;
    private FSAppAttempt starvingApp;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> getParameters() {
        return Arrays.asList({"MinSharePreemption", 0}, {"MinSharePreemptionWithDRF", 1}, {"FairSharePreemption", 2}, {"FairSharePreemptionWithDRF", 3});
    }

    public TestFairSchedulerPreemption(String name, int mode) throws IOException {
        this.fairsharePreemption = mode > 1;
        this.drf = mode % 2 == 1;
        this.writeAllocFile();
    }

    @Before
    public void setup() throws IOException {
        this.createConfiguration();
        this.conf.set("yarn.scheduler.fair.allocation.file", ALLOC_FILE.getAbsolutePath());
        this.conf.setBoolean("yarn.scheduler.fair.preemption", true);
        this.conf.setFloat("yarn.scheduler.fair.preemption.cluster-utilization-threshold", 0.0f);
        this.conf.setInt("yarn.scheduler.fair.waitTimeBeforeKill", 0);
        this.conf.setLong("yarn.scheduler.fair.update-interval-ms", 60000L);
        this.setupCluster();
    }

    @After
    public void teardown() {
        ALLOC_FILE.delete();
        this.conf = null;
        if (this.resourceManager != null) {
            this.resourceManager.stop();
            this.resourceManager = null;
        }
    }

    private void writeAllocFile() {
        AllocationFileWriter allocationFileWriter = this.fairsharePreemption ? AllocationFileWriter.create().addQueue(new AllocationFileQueue.Builder("root").subQueue(new AllocationFileQueue.Builder("preemptable").fairSharePreemptionThreshold(1.0).fairSharePreemptionTimeout(0).subQueue(new AllocationFileQueue.Builder("child-1").build()).subQueue(new AllocationFileQueue.Builder("child-2").build()).build()).subQueue(new AllocationFileQueue.Builder("preemptable-sibling").fairSharePreemptionThreshold(1.0).fairSharePreemptionTimeout(0).build()).subQueue(new AllocationFileQueue.Builder("nonpreemptable").allowPreemptionFrom(false).fairSharePreemptionThreshold(1.0).fairSharePreemptionTimeout(0).subQueue(new AllocationFileQueue.Builder("child-1").build()).subQueue(new AllocationFileQueue.Builder("child-2").build()).build()).build()) : AllocationFileWriter.create().addQueue(new AllocationFileQueue.Builder("root").subQueue(new AllocationFileQueue.Builder("preemptable").minSharePreemptionTimeout(0).subQueue(new AllocationFileQueue.Builder("child-1").minResources("4096mb,4vcores").build()).subQueue(new AllocationFileQueue.Builder("child-2").minResources("4096mb,4vcores").build()).build()).subQueue(new AllocationFileQueue.Builder("preemptable-sibling").minSharePreemptionTimeout(0).build()).subQueue(new AllocationFileQueue.Builder("nonpreemptable").allowPreemptionFrom(false).minSharePreemptionTimeout(0).subQueue(new AllocationFileQueue.Builder("child-1").minResources("4096mb,4vcores").build()).subQueue(new AllocationFileQueue.Builder("child-2").minResources("4096mb,4vcores").build()).build()).build());
        if (this.drf) {
            allocationFileWriter.drfDefaultQueueSchedulingPolicy();
        }
        allocationFileWriter.writeToFile(ALLOC_FILE.getAbsolutePath());
        Assert.assertTrue((String)"Allocation file does not exist, not running the test", (boolean)ALLOC_FILE.exists());
    }

    private void setupCluster() throws IOException {
        this.resourceManager = new MockRM(this.conf);
        this.scheduler = (FairScheduler)this.resourceManager.getResourceScheduler();
        this.clock.setTime(SystemClock.getInstance().getTime());
        this.scheduler.setClock((Clock)this.clock);
        this.resourceManager.start();
        this.addNode(4096, 12);
        this.addNode(4096, 12);
        this.scheduler.reinitialize(this.conf, this.resourceManager.getRMContext());
        FSQueue child1 = this.scheduler.getQueueManager().getQueue("nonpreemptable.child-1");
        Assert.assertFalse((boolean)child1.isPreemptable());
        FSQueue child2 = this.scheduler.getQueueManager().getQueue("nonpreemptable.child-2");
        Assert.assertFalse((boolean)child2.isPreemptable());
    }

    private void sendEnoughNodeUpdatesToAssignFully() {
        for (RMNode node : this.rmNodes) {
            NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = new NodeUpdateSchedulerEvent(node);
            for (int i = 0; i < 4; ++i) {
                this.scheduler.handle((SchedulerEvent)nodeUpdateSchedulerEvent);
            }
        }
    }

    private void takeAllResources(String queueName) {
        ApplicationAttemptId appAttemptId = this.createSchedulingRequest(1024, 1, queueName, "default", 4 * this.rmNodes.size());
        this.greedyApp = this.scheduler.getSchedulerApp(appAttemptId);
        this.scheduler.update();
        this.sendEnoughNodeUpdatesToAssignFully();
        Assert.assertEquals((long)8L, (long)this.greedyApp.getLiveContainers().size());
        Assert.assertTrue((this.scheduler.getQueueManager().getQueue(queueName).isPreemptable() == this.greedyApp.isPreemptable() ? 1 : 0) != 0);
    }

    private void preemptHalfResources(String queueName) throws InterruptedException {
        ApplicationAttemptId appAttemptId = this.createSchedulingRequest(2048, 2, queueName, "default", 4 * this.rmNodes.size() / 2);
        this.starvingApp = this.scheduler.getSchedulerApp(appAttemptId);
        this.clock.tickSec(1);
        this.scheduler.update();
    }

    private void submitApps(String queue1, String queue2) throws InterruptedException {
        this.takeAllResources(queue1);
        this.preemptHalfResources(queue2);
    }

    private void verifyPreemption(int numStarvedAppContainers, int numGreedyAppContainers) throws InterruptedException {
        FSSchedulerNode node;
        for (int i = 0; i < 1000 && this.greedyApp.getLiveContainers().size() != numGreedyAppContainers; ++i) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((String)"Incorrect # of containers on the greedy app", (long)numGreedyAppContainers, (long)this.greedyApp.getLiveContainers().size());
        Assert.assertEquals((String)"Incorrect # of preempted containers in QueueMetrics", (long)(8 - numGreedyAppContainers), (long)this.greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
        for (RMNode rmNode : this.rmNodes) {
            node = (FSSchedulerNode)this.scheduler.getNodeTracker().getNode(rmNode.getNodeID());
            if (node.getContainersForPreemption().size() <= 0) continue;
            Assert.assertTrue((String)"node should be reserved for the starvingApp", (boolean)node.getPreemptionList().keySet().contains(this.starvingApp));
        }
        this.sendEnoughNodeUpdatesToAssignFully();
        Assert.assertEquals((String)"Starved app is not assigned the right # of containers", (long)numStarvedAppContainers, (long)this.starvingApp.getLiveContainers().size());
        for (RMNode rmNode : this.rmNodes) {
            node = (FSSchedulerNode)this.scheduler.getNodeTracker().getNode(rmNode.getNodeID());
            if (node.getContainersForPreemption().size() <= 0) continue;
            Assert.assertFalse((boolean)node.getPreemptionList().keySet().contains(this.starvingApp));
        }
    }

    private void verifyNoPreemption() throws InterruptedException {
        for (int i = 0; i < 100 && this.greedyApp.getLiveContainers().size() == 8; ++i) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)8L, (long)this.greedyApp.getLiveContainers().size());
    }

    @Test
    public void testPreemptionWithinSameLeafQueue() throws Exception {
        String queue = "root.preemptable.child-1";
        this.submitApps(queue, queue);
        if (this.fairsharePreemption) {
            this.verifyPreemption(2, 4);
        } else {
            this.verifyNoPreemption();
        }
    }

    @Test
    public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
        this.submitApps("root.preemptable.child-1", "root.preemptable.child-2");
        this.verifyPreemption(2, 4);
    }

    @Test
    public void testPreemptionBetweenNonSiblingQueues() throws Exception {
        this.submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
        this.verifyPreemption(2, 4);
    }

    @Test
    public void testNoPreemptionFromDisallowedQueue() throws Exception {
        this.submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
        this.verifyNoPreemption();
    }

    private void setNumAMContainersPerNode(int numAMContainersPerNode) {
        List potentialNodes = this.scheduler.getNodeTracker().getNodesByResourceName("*");
        for (FSSchedulerNode node : potentialNodes) {
            List containers = node.getCopiedListOfRunningContainers();
            for (int i = 0; i < numAMContainersPerNode; ++i) {
                ((RMContainerImpl)containers.get(i)).setAMContainer(true);
            }
        }
    }

    @Test
    public void testPreemptionSelectNonAMContainer() throws Exception {
        this.takeAllResources("root.preemptable.child-1");
        this.setNumAMContainersPerNode(2);
        this.preemptHalfResources("root.preemptable.child-2");
        this.verifyPreemption(2, 4);
        ArrayList containers = (ArrayList)this.starvingApp.getLiveContainers();
        String host0 = ((RMContainer)containers.get(0)).getNodeId().getHost();
        String host1 = ((RMContainer)containers.get(1)).getNodeId().getHost();
        Assert.assertTrue((String)"Preempted containers should come from two different nodes.", (!host0.equals(host1) ? 1 : 0) != 0);
    }

    @Test
    public void testAppNotPreemptedBelowFairShare() throws Exception {
        this.takeAllResources("root.preemptable.child-1");
        this.tryPreemptMoreThanFairShare("root.preemptable.child-2");
    }

    private void tryPreemptMoreThanFairShare(String queueName) throws InterruptedException {
        ApplicationAttemptId appAttemptId = this.createSchedulingRequest(3072, 3, queueName, "default", 4 * this.rmNodes.size() / 2);
        this.starvingApp = this.scheduler.getSchedulerApp(appAttemptId);
        this.verifyPreemption(1, 5);
    }

    @Test
    public void testDisableAMPreemption() {
        this.takeAllResources("root.preemptable.child-1");
        this.setNumAMContainersPerNode(2);
        RMContainer container = this.greedyApp.getLiveContainers().stream().filter(rmContainer -> rmContainer.isAMContainer()).findFirst().get();
        this.greedyApp.setEnableAMPreemption(false);
        Assert.assertFalse((boolean)this.greedyApp.canContainerBePreempted(container, null));
    }

    @Test
    public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() throws InterruptedException {
        if (!this.fairsharePreemption) {
            return;
        }
        this.takeAllResources("root.preemptable.child-1");
        this.preemptHalfResources("root.preemptable-sibling");
        this.verifyPreemption(2, 4);
        this.preemptHalfResources("root.preemptable.child-2");
        this.verifyPreemption(1, 2);
    }

    @Test
    public void testRelaxLocalityPreemptionWithLessAMInRemainingNodes() throws Exception {
        this.takeAllResources("root.preemptable.child-1");
        RMNode node1 = (RMNode)this.rmNodes.get(0);
        this.setAllAMContainersOnNode(node1.getNodeID());
        ApplicationAttemptId greedyAppAttemptId = this.getGreedyAppAttemptIdOnNode(node1.getNodeID());
        this.updateRelaxLocalityRequestSchedule(node1, 1024, 4);
        this.verifyRelaxLocalityPreemption(node1.getNodeID(), greedyAppAttemptId, 4);
    }

    @Test
    public void testRelaxLocalityPreemptionWithNoLessAMInRemainingNodes() throws Exception {
        this.takeAllResources("root.preemptable.child-1");
        RMNode node1 = (RMNode)this.rmNodes.get(0);
        this.setNumAMContainersOnNode(3, node1.getNodeID());
        RMNode node2 = (RMNode)this.rmNodes.get(1);
        this.setAllAMContainersOnNode(node2.getNodeID());
        ApplicationAttemptId greedyAppAttemptId = this.getGreedyAppAttemptIdOnNode(node2.getNodeID());
        this.updateRelaxLocalityRequestSchedule(node1, 2048, 1);
        this.verifyRelaxLocalityPreemption(node2.getNodeID(), greedyAppAttemptId, 6);
    }

    private void setAllAMContainersOnNode(NodeId nodeId) {
        this.setNumAMContainersOnNode(Integer.MAX_VALUE, nodeId);
    }

    private void setNumAMContainersOnNode(int num, NodeId nodeId) {
        int count = 0;
        SchedulerNode node = this.scheduler.getNodeTracker().getNode(nodeId);
        for (RMContainer container : node.getCopiedListOfRunningContainers()) {
            if (++count > num) break;
            ((RMContainerImpl)container).setAMContainer(true);
        }
    }

    private ApplicationAttemptId getGreedyAppAttemptIdOnNode(NodeId nodeId) {
        SchedulerNode node = this.scheduler.getNodeTracker().getNode(nodeId);
        return ((RMContainer)node.getCopiedListOfRunningContainers().get(0)).getApplicationAttemptId();
    }

    private void updateRelaxLocalityRequestSchedule(RMNode node, int nodeMemory, int numNodeContainers) {
        ResourceRequest nodeRequest = this.createResourceRequest(nodeMemory, node.getHostName(), 1, numNodeContainers, true);
        ResourceRequest rackRequest = this.createResourceRequest(10240, node.getRackName(), 1, 1, true);
        ResourceRequest anyRequest = this.createResourceRequest(10240, "*", 1, 1, true);
        List<ResourceRequest> resourceRequests = Arrays.asList(nodeRequest, rackRequest, anyRequest);
        ApplicationAttemptId starvedAppAttemptId = this.createSchedulingRequest("root.preemptable.child-2", "default", resourceRequests);
        this.starvingApp = this.scheduler.getSchedulerApp(starvedAppAttemptId);
        this.clock.tickSec(1);
        this.scheduler.update();
    }

    private void verifyRelaxLocalityPreemption(NodeId notBePreemptedNodeId, ApplicationAttemptId greedyAttemptId, int numGreedyAppContainers) throws Exception {
        this.verifyPreemption(0, numGreedyAppContainers);
        SchedulerNode node = this.scheduler.getNodeTracker().getNode(notBePreemptedNodeId);
        for (RMContainer container : node.getCopiedListOfRunningContainers()) {
            assert (container.isAMContainer());
            assert (container.getApplicationAttemptId().equals((Object)greedyAttemptId));
        }
    }
}

