/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestCapacitySchedulerAsyncScheduling {
    private final int GB = 1024;
    private YarnConfiguration conf;
    RMNodeLabelsManager mgr;
    private NMHeartbeatThread nmHeartbeatThread = null;

    @Before
    public void setUp() throws Exception {
        this.conf = new YarnConfiguration();
        this.conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        this.conf.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", true);
        this.mgr = new NullRMNodeLabelsManager();
        this.mgr.init((Configuration)this.conf);
    }

    @Test(timeout=300000L)
    public void testSingleThreadAsyncContainerAllocation() throws Exception {
        this.testAsyncContainerAllocation(1);
    }

    @Test(timeout=300000L)
    public void testTwoThreadsAsyncContainerAllocation() throws Exception {
        this.testAsyncContainerAllocation(2);
    }

    @Test(timeout=300000L)
    public void testThreeThreadsAsyncContainerAllocation() throws Exception {
        this.testAsyncContainerAllocation(3);
    }

    public void testAsyncContainerAllocation(int numThreads) throws Exception {
        int waitTime;
        int i;
        this.conf.setInt("yarn.scheduler.capacity.schedule-asynchronously.maximum-threads", numThreads);
        this.conf.setInt("yarn.scheduler.capacity.schedule-asynchronously.scheduling-interval-ms", 0);
        final NullRMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
        mgr.init((Configuration)this.conf);
        MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues((Configuration)this.conf)){

            @Override
            public RMNodeLabelsManager createNodeLabelManager() {
                return mgr;
            }
        };
        rm.getRMContext().setNodeLabelManager((RMNodeLabelsManager)mgr);
        rm.start();
        ArrayList<MockNM> nms = new ArrayList<MockNM>();
        for (int i2 = 0; i2 < 10; ++i2) {
            nms.add(rm.registerNode("127.0.0." + i2 + ":1234", 20480));
        }
        this.keepNMHeartbeat(nms, 1000);
        ArrayList<MockAM> ams = new ArrayList<MockAM>();
        int totalAsked = 3072;
        for (i = 0; i < 3; ++i) {
            RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, Character.toString((char)(i % 34 + 97)), 1, null, null, false);
            MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
            am.registerAppAttempt();
            ams.add(am);
        }
        for (i = 0; i < 3; ++i) {
            ((MockAM)ams.get(i)).allocate("*", 1024, 20 * (i + 1), new ArrayList<ContainerId>());
            totalAsked += 20 * (i + 1) * 1024;
        }
        for (waitTime = 15000; waitTime > 0 && rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB() != (long)totalAsked; waitTime -= 50) {
            Thread.sleep(50L);
        }
        Assert.assertEquals((long)rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(), (long)totalAsked);
        for (waitTime = 2000; waitTime > 0; waitTime -= 50) {
            Assert.assertEquals((long)rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(), (long)totalAsked);
            Thread.sleep(50L);
        }
        rm.close();
    }

    @Test(timeout=30000L)
    public void testCommitProposalForFailedAppAttempt() throws Exception {
        Configuration disableAsyncConf = new Configuration((Configuration)this.conf);
        disableAsyncConf.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", false);
        MockRM rm = new MockRM(disableAsyncConf);
        rm.start();
        MockNM nm1 = rm.registerNode("192.168.0.1:1234", 9216);
        MockNM nm2 = rm.registerNode("192.168.0.2:2234", 9216);
        ArrayList<MockNM> nmLst = new ArrayList<MockNM>();
        nmLst.add(nm1);
        nmLst.add(nm2);
        while (((CapacityScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount() < 2) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)2L, (long)((AbstractYarnScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount());
        CapacityScheduler scheduler = (CapacityScheduler)rm.getRMContext().getScheduler();
        SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
        SchedulerNode sn2 = scheduler.getSchedulerNode(nm2.getNodeId());
        RMApp app = rm.submitApp(200, "app", "user", null, false, "default", 2, null, null, true, true);
        MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
        FiCaSchedulerApp schedulerApp = scheduler.getApplicationAttempt(am.getApplicationAttemptId());
        this.allocateAndLaunchContainers(am, nm2, rm, 1, Resources.createResource((int)5120), 0, 2);
        Assert.assertEquals((long)1L, (long)sn1.getNumContainers());
        Assert.assertEquals((long)1L, (long)sn2.getNumContainers());
        scheduler.handle((SchedulerEvent)new AppAttemptRemovedSchedulerEvent(am.getApplicationAttemptId(), RMAppAttemptState.KILLED, true));
        while (sn1.getCopiedListOfRunningContainers().size() == 1) {
            Thread.sleep(100L);
        }
        while (sn1.getCopiedListOfRunningContainers().size() == 0) {
            nm1.nodeHeartbeat(true);
            Thread.sleep(100L);
        }
        Resource reservedResource = Resources.createResource((int)5120);
        Container container = Container.newInstance((ContainerId)ContainerId.newContainerId((ApplicationAttemptId)am.getApplicationAttemptId(), (long)3L), (NodeId)sn2.getNodeID(), (String)sn2.getHttpAddress(), (Resource)reservedResource, (Priority)Priority.newInstance((int)0), null);
        RMContainerImpl rmContainer = new RMContainerImpl(container, SchedulerRequestKey.create((ResourceRequest)ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)reservedResource, (int)1)), am.getApplicationAttemptId(), sn2.getNodeID(), "user", rm.getRMContext());
        SchedulerContainer reservedContainer = new SchedulerContainer((SchedulerApplicationAttempt)schedulerApp, (SchedulerNode)scheduler.getNode(sn2.getNodeID()), (RMContainer)rmContainer, "", false);
        ContainerAllocationProposal reservedForAttempt1Proposal = new ContainerAllocationProposal(reservedContainer, null, reservedContainer, NodeType.OFF_SWITCH, NodeType.OFF_SWITCH, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, reservedResource);
        ArrayList<ContainerAllocationProposal> reservedProposals = new ArrayList<ContainerAllocationProposal>();
        reservedProposals.add(reservedForAttempt1Proposal);
        ResourceCommitRequest request = new ResourceCommitRequest(null, reservedProposals, null);
        scheduler.tryCommit(scheduler.getClusterResource(), request, true);
        Assert.assertNull((String)"Outdated proposal should not be accepted!", (Object)sn2.getReservedContainer());
        rm.stop();
    }

    @Test(timeout=30000L)
    public void testCommitOutdatedReservedProposal() throws Exception {
        int waitTime;
        Configuration disableAsyncConf = new Configuration((Configuration)this.conf);
        disableAsyncConf.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", false);
        MockRM rm = new MockRM(disableAsyncConf);
        rm.start();
        MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9216);
        MockNM nm2 = rm.registerNode("127.0.0.2:2234", 9216);
        for (waitTime = 1000; waitTime > 0 && ((AbstractYarnScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount() < 2; waitTime -= 10) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)2L, (long)((AbstractYarnScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount());
        ResourceScheduler scheduler = rm.getRMContext().getScheduler();
        final SchedulerNode sn1 = ((CapacityScheduler)scheduler).getSchedulerNode(nm1.getNodeId());
        final SchedulerNode sn2 = ((CapacityScheduler)scheduler).getSchedulerNode(nm2.getNodeId());
        RMApp app = rm.submitApp(200, "app", "user", null, "default");
        MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
        RMApp app2 = rm.submitApp(200, "app", "user", null, "default");
        final MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
        this.allocateAndLaunchContainers(am, nm1, rm, 1, Resources.createResource((int)5120), 0, 2);
        this.allocateAndLaunchContainers(am, nm2, rm, 1, Resources.createResource((int)5120), 0, 3);
        Assert.assertEquals((long)3L, (long)sn1.getNumContainers());
        Assert.assertEquals((long)1L, (long)sn2.getNumContainers());
        ResourceRequest rr2 = ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)Resources.createResource((int)5120), (int)1);
        am.allocate(Arrays.asList(rr2), null);
        nm1.nodeHeartbeat(true);
        for (waitTime = 1000; waitTime > 0 && sn1.getReservedContainer() == null; waitTime -= 10) {
            Thread.sleep(10L);
        }
        Assert.assertNotNull((Object)sn1.getReservedContainer());
        final CapacityScheduler cs = (CapacityScheduler)scheduler;
        CapacityScheduler spyCs = (CapacityScheduler)Mockito.spy((Object)cs);
        final AtomicBoolean isFirstReserve = new AtomicBoolean(true);
        final AtomicBoolean isChecked = new AtomicBoolean(false);
        ((CapacityScheduler)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                ResourceCommitRequest request = (ResourceCommitRequest)invocation.getArguments()[1];
                if (request.getContainersToReserve().size() > 0 && isFirstReserve.compareAndSet(true, false)) {
                    int waitTime;
                    RMContainer killableContainer = (RMContainer)sn2.getCopiedListOfRunningContainers().get(0);
                    cs.completedContainer(killableContainer, ContainerStatus.newInstance((ContainerId)killableContainer.getContainerId(), (ContainerState)ContainerState.COMPLETE, (String)"", (int)-106), RMContainerEventType.KILL);
                    Assert.assertEquals((long)0L, (long)sn2.getCopiedListOfRunningContainers().size());
                    cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(sn2.getRMNode()));
                    for (waitTime = 1000; waitTime > 0 && sn2.getCopiedListOfRunningContainers().size() == 0; waitTime -= 10) {
                        Thread.sleep(10L);
                    }
                    Assert.assertEquals((long)1L, (long)sn2.getCopiedListOfRunningContainers().size());
                    Assert.assertNull((Object)sn1.getReservedContainer());
                    ResourceRequest rr3 = ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)Resources.createResource((int)5120), (int)1);
                    am2.allocate(Arrays.asList(rr3), null);
                    cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(sn1.getRMNode()));
                    for (waitTime = 1000; waitTime > 0 && sn1.getReservedContainer() == null; waitTime -= 10) {
                        Thread.sleep(10L);
                    }
                    Assert.assertNotNull((Object)sn1.getReservedContainer());
                    try {
                        cs.tryCommit((Resource)invocation.getArguments()[0], (ResourceCommitRequest)invocation.getArguments()[1], true);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail();
                    }
                    isChecked.set(true);
                } else {
                    cs.tryCommit((Resource)invocation.getArguments()[0], (ResourceCommitRequest)invocation.getArguments()[1], true);
                }
                return null;
            }
        }).when((Object)spyCs)).tryCommit((Resource)Mockito.any(Resource.class), (ResourceCommitRequest)Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
        spyCs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(sn1.getRMNode()));
        for (waitTime = 1000; waitTime > 0 && !isChecked.get(); waitTime -= 10) {
            Thread.sleep(10L);
        }
        rm.stop();
    }

    @Test(timeout=30000L)
    public void testNodeResourceOverAllocated() throws Exception {
        Configuration disableAsyncConf = new Configuration((Configuration)this.conf);
        disableAsyncConf.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", false);
        MockRM rm = new MockRM(disableAsyncConf);
        rm.start();
        MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9216);
        MockNM nm2 = rm.registerNode("127.0.0.2:1234", 9216);
        ArrayList<MockNM> nmLst = new ArrayList<MockNM>();
        nmLst.add(nm1);
        nmLst.add(nm2);
        while (((CapacityScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount() < 2) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)2L, (long)((AbstractYarnScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount());
        CapacityScheduler scheduler = (CapacityScheduler)rm.getRMContext().getScheduler();
        SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
        RMApp app = rm.submitApp(200, "app", "user", null, false, "default", 2, null, null, true, true);
        MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
        FiCaSchedulerApp schedulerApp = scheduler.getApplicationAttempt(am.getApplicationAttemptId());
        Resource containerResource = Resources.createResource((int)5120);
        am.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)containerResource, (int)2)), null);
        for (int containerNo = 2; containerNo <= 3; ++containerNo) {
            Container container = Container.newInstance((ContainerId)ContainerId.newContainerId((ApplicationAttemptId)am.getApplicationAttemptId(), (long)containerNo), (NodeId)sn1.getNodeID(), (String)sn1.getHttpAddress(), (Resource)containerResource, (Priority)Priority.newInstance((int)0), null);
            RMContainerImpl rmContainer = new RMContainerImpl(container, SchedulerRequestKey.create((ResourceRequest)ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)containerResource, (int)1)), am.getApplicationAttemptId(), sn1.getNodeID(), "user", rm.getRMContext());
            SchedulerContainer newContainer = new SchedulerContainer((SchedulerApplicationAttempt)schedulerApp, (SchedulerNode)scheduler.getNode(sn1.getNodeID()), (RMContainer)rmContainer, "", true);
            ContainerAllocationProposal newContainerProposal = new ContainerAllocationProposal(newContainer, null, null, NodeType.OFF_SWITCH, NodeType.OFF_SWITCH, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, containerResource);
            ArrayList<ContainerAllocationProposal> newProposals = new ArrayList<ContainerAllocationProposal>();
            newProposals.add(newContainerProposal);
            ResourceCommitRequest request = new ResourceCommitRequest(newProposals, null, null);
            scheduler.tryCommit(scheduler.getClusterResource(), request, true);
        }
        Assert.assertTrue((String)"Node resource is Over-allocated!", (sn1.getUnallocatedResource().getMemorySize() > 0L ? 1 : 0) != 0);
        rm.stop();
    }

    @Test
    public void testAsyncSchedulerSkipNoHeartbeatNMs() throws Exception {
        int i;
        int heartbeatInterval = 100;
        this.conf.setInt("yarn.scheduler.capacity.schedule-asynchronously.maximum-threads", 1);
        this.conf.setInt("yarn.scheduler.capacity.schedule-asynchronously.scheduling-interval-ms", 100);
        this.conf.setInt("yarn.resourcemanager.nodemanagers.heartbeat-interval-ms", heartbeatInterval);
        final NullRMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
        mgr.init((Configuration)this.conf);
        MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues((Configuration)this.conf)){

            @Override
            public RMNodeLabelsManager createNodeLabelManager() {
                return mgr;
            }
        };
        CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
        rm.getRMContext().setNodeLabelManager((RMNodeLabelsManager)mgr);
        rm.start();
        ArrayList<MockNM> nms = new ArrayList<MockNM>();
        for (int i2 = 0; i2 < 10; ++i2) {
            nms.add(rm.registerNode("127.0.0." + i2 + ":1234", 20480));
        }
        ArrayList<MockAM> ams = new ArrayList<MockAM>();
        this.keepNMHeartbeat(nms, heartbeatInterval);
        for (i = 0; i < 3; ++i) {
            RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, Character.toString((char)(i % 34 + 97)), 1, null, null, false);
            MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
            am.registerAppAttempt();
            ams.add(am);
        }
        this.pauseNMHeartbeat();
        Thread.sleep(heartbeatInterval * 3);
        for (i = 0; i < 3; ++i) {
            ((MockAM)ams.get(i)).allocate("*", 1024, 20 * (i + 1), new ArrayList<ContainerId>());
        }
        for (i = 0; i < 5; ++i) {
            ((MockNM)nms.get(i)).nodeHeartbeat(true);
        }
        Thread.sleep(2000L);
        for (i = 0; i < 9; ++i) {
            if (i < 5) {
                Assert.assertTrue((this.checkNumNonAMContainersOnNode(cs, (MockNM)nms.get(i)) > 0 ? 1 : 0) != 0);
                continue;
            }
            Assert.assertTrue((this.checkNumNonAMContainersOnNode(cs, (MockNM)nms.get(i)) == 0 ? 1 : 0) != 0);
        }
        rm.close();
    }

    @Test(timeout=30000L)
    public void testCommitDuplicatedAllocateFromReservedProposals() throws Exception {
        Configuration disableAsyncConf = new Configuration((Configuration)this.conf);
        disableAsyncConf.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", false);
        MockRM rm = new MockRM(disableAsyncConf);
        rm.start();
        MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8192);
        rm.registerNode("192.168.0.2:2234", 8192);
        while (((CapacityScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount() < 2) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)2L, (long)((AbstractYarnScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount());
        final CapacityScheduler cs = (CapacityScheduler)rm.getRMContext().getScheduler();
        final SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
        RMApp app = rm.submitApp(1024, "app", "user", null, false, "default", 2, null, null, true, true);
        MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
        FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(am.getApplicationAttemptId());
        this.allocateAndLaunchContainers(am, nm1, rm, 1, Resources.createResource((int)6144), 0, 2);
        Assert.assertEquals((long)2L, (long)sn1.getNumContainers());
        Assert.assertEquals((long)1024L, (long)sn1.getUnallocatedResource().getMemorySize());
        am.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)Resources.createResource((int)2048), (int)5)), null);
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(sn1.getRMNode()));
        Assert.assertEquals((long)1L, (long)schedulerApp.getReservedContainers().size());
        for (RMContainer rmContainer : sn1.getCopiedListOfRunningContainers()) {
            if (rmContainer.getContainerId().getContainerId() == 1L) continue;
            cs.completedContainer(rmContainer, ContainerStatus.newInstance((ContainerId)rmContainer.getContainerId(), (ContainerState)ContainerState.COMPLETE, (String)"", (int)-106), RMContainerEventType.KILL);
        }
        Assert.assertEquals((long)7168L, (long)sn1.getUnallocatedResource().getMemorySize());
        CapacityScheduler spyCs = (CapacityScheduler)Mockito.spy((Object)cs);
        ((CapacityScheduler)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Boolean answer(InvocationOnMock invocation) throws Exception {
                ResourceCommitRequest request = (ResourceCommitRequest)invocation.getArguments()[1];
                if (request.getFirstAllocatedOrReservedContainer().getAllocateFromReservedContainer() != null) {
                    for (int i = 0; i < 3; ++i) {
                        cs.tryCommit((Resource)invocation.getArguments()[0], (ResourceCommitRequest)invocation.getArguments()[1], ((Boolean)invocation.getArguments()[2]).booleanValue());
                    }
                    Assert.assertEquals((long)2L, (long)sn1.getCopiedListOfRunningContainers().size());
                    Assert.assertEquals((long)5120L, (long)sn1.getUnallocatedResource().getMemorySize());
                }
                return true;
            }
        }).when((Object)spyCs)).tryCommit((Resource)Mockito.any(Resource.class), (ResourceCommitRequest)Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
        spyCs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(sn1.getRMNode()));
        rm.stop();
    }

    @Test(timeout=60000L)
    public void testReleaseOutdatedReservedContainer() throws Exception {
        MockRM rm1 = new MockRM();
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 8192);
        MockNM nm2 = rm1.registerNode("h2:1234", 8192);
        MockNM nm3 = rm1.registerNode("h3:1234", 8192);
        rm1.drainEvents();
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        LeafQueue defaultQueue = (LeafQueue)cs.getQueue("default");
        SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
        SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
        SchedulerNode sn3 = cs.getSchedulerNode(nm3.getNodeId());
        RMApp app1 = rm1.submitApp(4096, "app", "user", null, "default");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        Resource allocateResource = Resources.createResource((int)5120);
        am1.allocate("*", (int)allocateResource.getMemorySize(), 3, 0, new ArrayList<ContainerId>(), "");
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
        Assert.assertEquals((long)1L, (long)schedulerApp1.getReservedContainers().size());
        Assert.assertEquals((long)9216L, (long)defaultQueue.getQueueResourceUsage().getUsed().getMemorySize());
        RMContainer reservedContainer = (RMContainer)schedulerApp1.getReservedContainers().get(0);
        ResourceCommitRequest allocateFromSameReservedContainerProposal1 = this.createAllocateFromReservedProposal(3, allocateResource, schedulerApp1, sn2, sn1, cs.getRMContext(), reservedContainer);
        boolean tryCommitResult = cs.tryCommit(cs.getClusterResource(), allocateFromSameReservedContainerProposal1, true);
        Assert.assertTrue((boolean)tryCommitResult);
        ResourceCommitRequest allocateFromSameReservedContainerProposal2 = this.createAllocateFromReservedProposal(4, allocateResource, schedulerApp1, sn3, sn1, cs.getRMContext(), reservedContainer);
        tryCommitResult = cs.tryCommit(cs.getClusterResource(), allocateFromSameReservedContainerProposal2, true);
        Assert.assertFalse((String)"This proposal should be rejected because it try to release an outdated reserved container", (boolean)tryCommitResult);
        rm1.close();
    }

    private ResourceCommitRequest createAllocateFromReservedProposal(int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp, SchedulerNode allocateNode, SchedulerNode reservedNode, RMContext rmContext, RMContainer reservedContainer) {
        Container container = Container.newInstance((ContainerId)ContainerId.newContainerId((ApplicationAttemptId)schedulerApp.getApplicationAttemptId(), (long)containerId), (NodeId)allocateNode.getNodeID(), (String)allocateNode.getHttpAddress(), (Resource)allocateResource, (Priority)Priority.newInstance((int)0), null);
        RMContainerImpl rmContainer = new RMContainerImpl(container, SchedulerRequestKey.create((ResourceRequest)ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)allocateResource, (int)1)), schedulerApp.getApplicationAttemptId(), allocateNode.getNodeID(), "user", rmContext);
        SchedulerContainer allocateContainer = new SchedulerContainer((SchedulerApplicationAttempt)schedulerApp, allocateNode, (RMContainer)rmContainer, "", true);
        SchedulerContainer reservedSchedulerContainer = new SchedulerContainer((SchedulerApplicationAttempt)schedulerApp, reservedNode, reservedContainer, "", false);
        ArrayList<SchedulerContainer> toRelease = new ArrayList<SchedulerContainer>();
        toRelease.add(reservedSchedulerContainer);
        ContainerAllocationProposal allocateFromReservedProposal = new ContainerAllocationProposal(allocateContainer, toRelease, null, NodeType.OFF_SWITCH, NodeType.OFF_SWITCH, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, allocateResource);
        ArrayList<ContainerAllocationProposal> allocateProposals = new ArrayList<ContainerAllocationProposal>();
        allocateProposals.add(allocateFromReservedProposal);
        return new ResourceCommitRequest(allocateProposals, null, null);
    }

    private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
        if (this.nmHeartbeatThread != null) {
            this.nmHeartbeatThread.setShouldStop();
            this.nmHeartbeatThread = null;
        }
        this.nmHeartbeatThread = new NMHeartbeatThread(mockNMs, interval);
        this.nmHeartbeatThread.start();
    }

    private void pauseNMHeartbeat() {
        if (this.nmHeartbeatThread != null) {
            this.nmHeartbeatThread.setShouldStop();
            this.nmHeartbeatThread = null;
        }
    }

    private int checkNumNonAMContainersOnNode(CapacityScheduler cs, MockNM nm) {
        FiCaSchedulerNode node = cs.getNode(nm.getNodeId());
        int nonAMContainer = 0;
        for (RMContainer c : node.getCopiedListOfRunningContainers()) {
            if (c.isAMContainer()) continue;
            ++nonAMContainer;
        }
        return nonAMContainer;
    }

    private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, Resource resource, int priority2, int startContainerId) throws Exception {
        am.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)priority2), (String)"*", (Resource)resource, (int)nContainer)), null);
        ContainerId lastContainerId = ContainerId.newContainerId((ApplicationAttemptId)am.getApplicationAttemptId(), (long)(startContainerId + nContainer - 1));
        Assert.assertTrue((boolean)rm.waitForState(nm, lastContainerId, RMContainerState.ALLOCATED));
        am.allocate(null, null);
        CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
        for (int cId = startContainerId; cId < startContainerId + nContainer; ++cId) {
            ContainerId containerId = ContainerId.newContainerId((ApplicationAttemptId)am.getApplicationAttemptId(), (long)cId);
            RMContainer rmContainer = cs.getRMContainer(containerId);
            if (rmContainer != null) {
                rmContainer.handle((Event)new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
            } else {
                Assert.fail((String)"Cannot find RMContainer");
            }
            rm.waitForState(nm, ContainerId.newContainerId((ApplicationAttemptId)am.getApplicationAttemptId(), (long)cId), RMContainerState.RUNNING);
        }
    }

    public static class NMHeartbeatThread
    extends Thread {
        private List<MockNM> mockNMS;
        private int interval;
        private volatile boolean shouldStop = false;

        public NMHeartbeatThread(List<MockNM> mockNMs, int interval) {
            this.mockNMS = mockNMs;
            this.interval = interval;
        }

        @Override
        public void run() {
            while (!this.shouldStop) {
                for (MockNM nm : this.mockNMS) {
                    try {
                        nm.nodeHeartbeat(true);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(this.interval);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void setShouldStop() {
            this.shouldStop = true;
        }
    }
}

