/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ActiveStandbyElectorBasedElectorService;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElector;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestRMEmbeddedElector
extends ClientBaseWithFixes {
    private static final Log LOG = LogFactory.getLog((String)TestRMEmbeddedElector.class.getName());
    private static final String RM1_NODE_ID = "rm1";
    private static final int RM1_PORT_BASE = 10000;
    private static final String RM2_NODE_ID = "rm2";
    private static final int RM2_PORT_BASE = 20000;
    private Configuration conf;
    private AtomicBoolean callbackCalled;

    @Before
    public void setup() throws IOException {
        this.conf = new YarnConfiguration();
        this.conf.setBoolean("yarn.resourcemanager.ha.enabled", true);
        this.conf.setBoolean("yarn.resourcemanager.ha.automatic-failover.enabled", true);
        this.conf.setBoolean("yarn.resourcemanager.ha.automatic-failover.embedded", true);
        this.conf.set("yarn.resourcemanager.cluster-id", "yarn-test-cluster");
        this.conf.set("yarn.resourcemanager.zk-address", this.hostPort);
        this.conf.setInt("yarn.resourcemanager.zk-timeout-ms", 2000);
        this.conf.set("yarn.resourcemanager.ha.rm-ids", "rm1,rm2");
        this.conf.set("yarn.resourcemanager.ha.id", RM1_NODE_ID);
        HATestUtil.setRpcAddressForRM(RM1_NODE_ID, 10000, this.conf);
        HATestUtil.setRpcAddressForRM(RM2_NODE_ID, 20000, this.conf);
        this.conf.setLong("yarn.client.failover-sleep-base-ms", 100L);
        this.callbackCalled = new AtomicBoolean(false);
    }

    @Test(timeout=10000L)
    public void testDeadlockShutdownBecomeActive() throws InterruptedException {
        MockRMWithElector rm = new MockRMWithElector(this.conf, 1000L);
        rm.start();
        LOG.info((Object)"Waiting for callback");
        while (!this.callbackCalled.get()) {
        }
        LOG.info((Object)"Stopping RM");
        rm.stop();
        LOG.info((Object)"Stopped RM");
    }

    @Test
    public void testCallbackSynchronization() throws IOException, InterruptedException {
        this.testCallbackSynchronization(SyncTestType.ACTIVE);
        this.testCallbackSynchronization(SyncTestType.STANDBY);
        this.testCallbackSynchronization(SyncTestType.NEUTRAL);
        this.testCallbackSynchronization(SyncTestType.ACTIVE_TIMING);
        this.testCallbackSynchronization(SyncTestType.STANDBY_TIMING);
    }

    private void testCallbackSynchronization(SyncTestType type) throws IOException, InterruptedException {
        AdminService as = (AdminService)Mockito.mock(AdminService.class);
        RMContext rc = (RMContext)Mockito.mock(RMContext.class);
        ResourceManager rm = (ResourceManager)Mockito.mock(ResourceManager.class);
        Configuration myConf = new Configuration(this.conf);
        myConf.setInt("yarn.resourcemanager.zk-timeout-ms", 50);
        Mockito.when((Object)rm.getRMContext()).thenReturn((Object)rc);
        Mockito.when((Object)rc.getRMAdminService()).thenReturn((Object)as);
        ActiveStandbyElectorBasedElectorService ees = new ActiveStandbyElectorBasedElectorService(rm);
        ees.init(myConf);
        ees.enterNeutralMode();
        switch (type) {
            case ACTIVE: {
                this.testCallbackSynchronizationActive(as, ees);
                break;
            }
            case STANDBY: {
                this.testCallbackSynchronizationStandby(as, ees);
                break;
            }
            case NEUTRAL: {
                this.testCallbackSynchronizationNeutral(as, ees);
                break;
            }
            case ACTIVE_TIMING: {
                this.testCallbackSynchronizationTimingActive(as, ees);
                break;
            }
            case STANDBY_TIMING: {
                this.testCallbackSynchronizationTimingStandby(as, ees);
                break;
            }
            default: {
                Assert.fail((String)("Unknown test type: " + (Object)((Object)type)));
            }
        }
    }

    private void testCallbackSynchronizationActive(AdminService as, ActiveStandbyElectorBasedElectorService ees) throws IOException, InterruptedException {
        ees.becomeActive();
        Thread.sleep(100L);
        ((AdminService)Mockito.verify((Object)as)).transitionToActive((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
        ((AdminService)Mockito.verify((Object)as, (VerificationMode)Mockito.never())).transitionToStandby((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
    }

    private void testCallbackSynchronizationStandby(AdminService as, ActiveStandbyElectorBasedElectorService ees) throws IOException, InterruptedException {
        ees.becomeStandby();
        Thread.sleep(100L);
        ((AdminService)Mockito.verify((Object)as, (VerificationMode)Mockito.atLeast((int)1))).transitionToStandby((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
        ((AdminService)Mockito.verify((Object)as, (VerificationMode)Mockito.atMost((int)1))).transitionToStandby((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
    }

    private void testCallbackSynchronizationNeutral(AdminService as, ActiveStandbyElectorBasedElectorService ees) throws IOException, InterruptedException {
        ees.enterNeutralMode();
        Thread.sleep(100L);
        ((AdminService)Mockito.verify((Object)as, (VerificationMode)Mockito.atLeast((int)1))).transitionToStandby((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
        ((AdminService)Mockito.verify((Object)as, (VerificationMode)Mockito.atMost((int)1))).transitionToStandby((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCallbackSynchronizationTimingActive(AdminService as, ActiveStandbyElectorBasedElectorService ees) throws IOException, InterruptedException {
        Object object = ees.zkDisconnectLock;
        synchronized (object) {
            Thread.sleep(100L);
            ees.becomeActive();
        }
        Thread.sleep(50L);
        ((AdminService)Mockito.verify((Object)as)).transitionToActive((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
        ((AdminService)Mockito.verify((Object)as, (VerificationMode)Mockito.never())).transitionToStandby((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCallbackSynchronizationTimingStandby(AdminService as, ActiveStandbyElectorBasedElectorService ees) throws IOException, InterruptedException {
        Object object = ees.zkDisconnectLock;
        synchronized (object) {
            Thread.sleep(100L);
            ees.becomeStandby();
        }
        Thread.sleep(50L);
        ((AdminService)Mockito.verify((Object)as, (VerificationMode)Mockito.atLeast((int)1))).transitionToStandby((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
        ((AdminService)Mockito.verify((Object)as, (VerificationMode)Mockito.atMost((int)1))).transitionToStandby((HAServiceProtocol.StateChangeRequestInfo)Matchers.any());
    }

    private class MockRMWithElector
    extends MockRM {
        private long delayMs;

        MockRMWithElector(Configuration conf) {
            super(conf);
            this.delayMs = 0L;
        }

        MockRMWithElector(Configuration conf, long delayMs) {
            this(conf);
            this.delayMs = delayMs;
        }

        @Override
        protected EmbeddedElector createEmbeddedElector() {
            return new ActiveStandbyElectorBasedElectorService(this){

                public void becomeActive() throws ServiceFailedException {
                    try {
                        TestRMEmbeddedElector.this.callbackCalled.set(true);
                        LOG.info((Object)"Callback called. Sleeping now");
                        Thread.sleep(MockRMWithElector.this.delayMs);
                        LOG.info((Object)"Sleep done");
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    super.becomeActive();
                }
            };
        }
    }

    private static enum SyncTestType {
        ACTIVE,
        STANDBY,
        NEUTRAL,
        ACTIVE_TIMING,
        STANDBY_TIMING;

    }
}

