/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.api.client.rpc;

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.common.CachedEntity;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.client.DAGClientTimelineImpl;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DagStatusSource;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.TimelineReaderFactory;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
import org.apache.tez.dag.api.records.DAGProtos;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestDAGClient {
    private DAGClient dagClient;
    private ApplicationId mockAppId;
    private ApplicationReport mockAppReport;
    private String dagIdStr;
    private DAGClientAMProtocolBlockingPB mockProxy;
    private DAGProtos.VertexStatusProto vertexStatusProtoWithoutCounters;
    private DAGProtos.VertexStatusProto vertexStatusProtoWithCounters;
    private DAGProtos.DAGStatusProto dagStatusProtoWithoutCounters;
    private DAGProtos.DAGStatusProto dagStatusProtoWithCounters;

    private void setUpData() {
        DAGProtos.ProgressProto dagProgressProto = DAGProtos.ProgressProto.newBuilder().setFailedTaskCount(1).setKilledTaskCount(1).setRunningTaskCount(2).setSucceededTaskCount(2).setTotalTaskCount(6).build();
        DAGProtos.TezCountersProto dagCountersProto = DAGProtos.TezCountersProto.newBuilder().addCounterGroups(DAGProtos.TezCounterGroupProto.newBuilder().setName("DAGGroup").addCounters(DAGProtos.TezCounterProto.newBuilder().setDisplayName("dag_counter_1").setValue(99L))).build();
        this.dagStatusProtoWithoutCounters = DAGProtos.DAGStatusProto.newBuilder().addDiagnostics("Diagnostics_0").setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).setDAGProgress(dagProgressProto).addVertexProgress(DAGProtos.StringProgressPairProto.newBuilder().setKey("v1").setProgress(DAGProtos.ProgressProto.newBuilder().setFailedTaskCount(0).setSucceededTaskCount(0).setKilledTaskCount(0))).addVertexProgress(DAGProtos.StringProgressPairProto.newBuilder().setKey("v2").setProgress(DAGProtos.ProgressProto.newBuilder().setFailedTaskCount(1).setSucceededTaskCount(1).setKilledTaskCount(1))).build();
        this.dagStatusProtoWithCounters = DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithoutCounters).setDagCounters(dagCountersProto).build();
        DAGProtos.ProgressProto vertexProgressProto = DAGProtos.ProgressProto.newBuilder().setFailedTaskCount(1).setKilledTaskCount(0).setRunningTaskCount(0).setSucceededTaskCount(1).build();
        DAGProtos.TezCountersProto vertexCountersProto = DAGProtos.TezCountersProto.newBuilder().addCounterGroups(DAGProtos.TezCounterGroupProto.newBuilder().addCounters(DAGProtos.TezCounterProto.newBuilder().setDisplayName("vertex_counter_1").setValue(99L))).build();
        this.vertexStatusProtoWithoutCounters = DAGProtos.VertexStatusProto.newBuilder().setId("vertex_1").addDiagnostics("V_Diagnostics_0").setProgress(vertexProgressProto).setState(DAGProtos.VertexStatusStateProto.VERTEX_SUCCEEDED).build();
        this.vertexStatusProtoWithCounters = DAGProtos.VertexStatusProto.newBuilder((DAGProtos.VertexStatusProto)this.vertexStatusProtoWithoutCounters).setVertexCounters(vertexCountersProto).build();
    }

    @Before
    public void setUp() throws YarnException, IOException, TezException, ServiceException {
        this.setUpData();
        this.mockAppId = (ApplicationId)Mockito.mock(ApplicationId.class);
        this.mockAppReport = (ApplicationReport)Mockito.mock(ApplicationReport.class);
        this.dagIdStr = "dag_9999_0001_1";
        this.mockProxy = (DAGClientAMProtocolBlockingPB)Mockito.mock(DAGClientAMProtocolBlockingPB.class);
        Mockito.when((Object)this.mockProxy.getDAGStatus((RpcController)Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)Mockito.any())).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(this.dagStatusProtoWithoutCounters).build());
        Mockito.when((Object)this.mockProxy.getDAGStatus((RpcController)Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)Mockito.argThat((ArgumentMatcher)new DAGCounterRequestMatcher()))).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(this.dagStatusProtoWithCounters).build());
        Mockito.when((Object)this.mockProxy.getVertexStatus((RpcController)Mockito.isNull(), (DAGClientAMProtocolRPC.GetVertexStatusRequestProto)Mockito.any())).thenReturn((Object)DAGClientAMProtocolRPC.GetVertexStatusResponseProto.newBuilder().setVertexStatus(this.vertexStatusProtoWithoutCounters).build());
        Mockito.when((Object)this.mockProxy.getVertexStatus((RpcController)Mockito.isNull(), (DAGClientAMProtocolRPC.GetVertexStatusRequestProto)Mockito.argThat((ArgumentMatcher)new VertexCounterRequestMatcher()))).thenReturn((Object)DAGClientAMProtocolRPC.GetVertexStatusResponseProto.newBuilder().setVertexStatus(this.vertexStatusProtoWithCounters).build());
        TezConfiguration tezConf = new TezConfiguration();
        this.dagClient = new DAGClientImpl(this.mockAppId, this.dagIdStr, tezConf, null, UserGroupInformation.getCurrentUser());
        DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)this.dagClient).getRealClient();
        realClient.appReport = this.mockAppReport;
        realClient.proxy = this.mockProxy;
    }

    @Test(timeout=5000L)
    public void testApp() throws IOException, TezException, ServiceException {
        Assert.assertTrue((boolean)this.dagClient.getExecutionContext().contains(this.mockAppId.toString()));
        Assert.assertEquals((Object)this.mockAppId.toString(), (Object)this.dagClient.getSessionIdentifierString());
        Assert.assertEquals((Object)this.dagIdStr, (Object)this.dagClient.getDagIdentifierString());
        DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)this.dagClient).getRealClient();
        Assert.assertEquals((Object)this.mockAppReport, (Object)realClient.getApplicationReportInternal());
    }

    @Test(timeout=5000L)
    public void testDAGStatus() throws Exception {
        DAGStatus resultDagStatus = this.dagClient.getDAGStatus(null);
        ((DAGClientAMProtocolBlockingPB)Mockito.verify((Object)this.mockProxy, (VerificationMode)Mockito.times((int)1))).getDAGStatus(null, DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder().setDagId(this.dagIdStr).setTimeout(0L).build());
        Assert.assertEquals((Object)new DAGStatus((DAGProtos.DAGStatusProtoOrBuilder)this.dagStatusProtoWithoutCounters, DagStatusSource.AM), (Object)resultDagStatus);
        System.out.println("DAGStatusWithoutCounter:" + resultDagStatus);
        resultDagStatus = this.dagClient.getDAGStatus(Sets.newSet((Object[])new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        ((DAGClientAMProtocolBlockingPB)Mockito.verify((Object)this.mockProxy, (VerificationMode)Mockito.times((int)1))).getDAGStatus(null, DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder().setDagId(this.dagIdStr).setTimeout(0L).addStatusOptions(DAGProtos.StatusGetOptsProto.GET_COUNTERS).build());
        Assert.assertEquals((Object)new DAGStatus((DAGProtos.DAGStatusProtoOrBuilder)this.dagStatusProtoWithCounters, DagStatusSource.AM), (Object)resultDagStatus);
        System.out.println("DAGStatusWithCounter:" + resultDagStatus);
    }

    @Test(timeout=5000L)
    public void testVertexStatus() throws Exception {
        VertexStatus resultVertexStatus = this.dagClient.getVertexStatus("v1", null);
        ((DAGClientAMProtocolBlockingPB)Mockito.verify((Object)this.mockProxy)).getVertexStatus(null, DAGClientAMProtocolRPC.GetVertexStatusRequestProto.newBuilder().setDagId(this.dagIdStr).setVertexName("v1").build());
        Assert.assertEquals((Object)new VertexStatus((DAGProtos.VertexStatusProtoOrBuilder)this.vertexStatusProtoWithoutCounters), (Object)resultVertexStatus);
        System.out.println("VertexWithoutCounter:" + resultVertexStatus);
        resultVertexStatus = this.dagClient.getVertexStatus("v1", Sets.newSet((Object[])new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        ((DAGClientAMProtocolBlockingPB)Mockito.verify((Object)this.mockProxy)).getVertexStatus(null, DAGClientAMProtocolRPC.GetVertexStatusRequestProto.newBuilder().setDagId(this.dagIdStr).setVertexName("v1").addStatusOptions(DAGProtos.StatusGetOptsProto.GET_COUNTERS).build());
        Assert.assertEquals((Object)new VertexStatus((DAGProtos.VertexStatusProtoOrBuilder)this.vertexStatusProtoWithCounters), (Object)resultVertexStatus);
        System.out.println("VertexWithCounter:" + resultVertexStatus);
    }

    @Test(timeout=5000L)
    public void testTryKillDAG() throws Exception {
        this.dagClient.tryKillDAG();
        ((DAGClientAMProtocolBlockingPB)Mockito.verify((Object)this.mockProxy, (VerificationMode)Mockito.times((int)1))).tryKillDAG(null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(this.dagIdStr).build());
    }

    @Test(timeout=5000L)
    public void testWaitForCompletion() throws Exception {
        Mockito.when((Object)this.mockProxy.getDAGStatus((RpcController)Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)Mockito.any())).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(this.dagStatusProtoWithoutCounters).build()).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithoutCounters).setState(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED).build()).build());
        this.dagClient.waitForCompletion();
        ArgumentCaptor rpcControllerArgumentCaptor = ArgumentCaptor.forClass(RpcController.class);
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(DAGClientAMProtocolRPC.GetDAGStatusRequestProto.class);
        ((DAGClientAMProtocolBlockingPB)Mockito.verify((Object)this.mockProxy, (VerificationMode)Mockito.times((int)2))).getDAGStatus((RpcController)rpcControllerArgumentCaptor.capture(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)argumentCaptor.capture());
    }

    @Test(timeout=5000L)
    public void testWaitForCompletionWithStatusUpdates() throws Exception {
        Mockito.when((Object)this.mockProxy.getDAGStatus((RpcController)Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)Mockito.any())).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithoutCounters).setState(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED).build()).build());
        ArgumentCaptor rpcControllerArgumentCaptor = ArgumentCaptor.forClass(RpcController.class);
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(DAGClientAMProtocolRPC.GetDAGStatusRequestProto.class);
        this.dagClient.waitForCompletionWithStatusUpdates(null);
        ((DAGClientAMProtocolBlockingPB)Mockito.verify((Object)this.mockProxy, (VerificationMode)Mockito.times((int)4))).getDAGStatus((RpcController)rpcControllerArgumentCaptor.capture(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)argumentCaptor.capture());
        Mockito.when((Object)this.mockProxy.getDAGStatus((RpcController)Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)Mockito.any())).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn((Object)DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder((DAGProtos.DAGStatusProto)this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED).build()).build());
        rpcControllerArgumentCaptor = ArgumentCaptor.forClass(RpcController.class);
        argumentCaptor = ArgumentCaptor.forClass(DAGClientAMProtocolRPC.GetDAGStatusRequestProto.class);
        this.dagClient.waitForCompletionWithStatusUpdates(Sets.newSet((Object[])new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        ((DAGClientAMProtocolBlockingPB)Mockito.verify((Object)this.mockProxy, (VerificationMode)Mockito.times((int)8))).getDAGStatus((RpcController)rpcControllerArgumentCaptor.capture(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)argumentCaptor.capture());
    }

    @Test(timeout=50000L)
    public void testGetDagStatusWithTimeout() throws Exception {
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.setLong("tez.dag.status.pollinterval-ms", 800L);
        DAGClientImplForTest dagClient = new DAGClientImplForTest(this.mockAppId, this.dagIdStr, tezConf, null);
        DAGClientRPCImplForTest dagClientRpc = new DAGClientRPCImplForTest(this.mockAppId, this.dagIdStr, tezConf, null);
        dagClient.setRealClient(dagClientRpc);
        dagClientRpc.setAMProxy(null);
        DAGStatus rmDagStatus = new DAGStatus((DAGProtos.DAGStatusProtoOrBuilder)this.constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_SUBMITTED), DagStatusSource.RM);
        dagClient.setRmDagStatus(rmDagStatus);
        long startTime = System.currentTimeMillis();
        DAGStatus dagStatus = dagClient.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
        long endTime = System.currentTimeMillis();
        long diff = endTime - startTime;
        Assert.assertTrue((diff > 1500L && diff < 2500L ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)dagClientRpc.numGetStatusViaAmInvocations);
        Assert.assertEquals((long)4L, (long)dagClient.numGetStatusViaRmInvocations);
        Assert.assertEquals((Object)DAGStatus.State.SUBMITTED, (Object)dagStatus.getState());
        dagClient.resetCounters();
        dagClientRpc.resetCounters();
        rmDagStatus = new DAGStatus((DAGProtos.DAGStatusProtoOrBuilder)this.constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM);
        dagClient.setRmDagStatus(rmDagStatus);
        dagClientRpc.setAMProxy(this.createMockProxy(DAGProtos.DAGStatusStateProto.DAG_RUNNING, -1L));
        startTime = System.currentTimeMillis();
        dagStatus = dagClient.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
        endTime = System.currentTimeMillis();
        diff = endTime - startTime;
        Assert.assertTrue((diff > 1500L && diff < 2500L ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)dagClient.numGetStatusViaRmInvocations);
        Assert.assertEquals((long)2L, (long)dagClientRpc.numGetStatusViaAmInvocations);
        Assert.assertEquals((Object)DAGStatus.State.RUNNING, (Object)dagStatus.getState());
        dagClient.resetCounters();
        dagClientRpc.resetCounters();
        rmDagStatus = new DAGStatus((DAGProtos.DAGStatusProtoOrBuilder)this.constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM);
        dagClient.setRmDagStatus(rmDagStatus);
        dagClientRpc.setAMProxy(this.createMockProxy(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
        startTime = System.currentTimeMillis();
        dagStatus = dagClient.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
        endTime = System.currentTimeMillis();
        diff = endTime - startTime;
        Assert.assertTrue((diff > 500L && diff < 1500L ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)dagClient.numGetStatusViaRmInvocations);
        Assert.assertEquals((long)1L, (long)dagClientRpc.numGetStatusViaAmInvocations);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
    }

    @Test(timeout=5000L)
    public void testDagClientTimelineEnabledCondition() throws IOException {
        String historyLoggingClass = "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
        TestDAGClient.testAtsEnabled(this.mockAppId, this.dagIdStr, false, "", true, true);
        TestDAGClient.testAtsEnabled(this.mockAppId, this.dagIdStr, false, historyLoggingClass, false, true);
        TestDAGClient.testAtsEnabled(this.mockAppId, this.dagIdStr, false, historyLoggingClass, true, false);
        TestDAGClient.testAtsEnabled(this.mockAppId, this.dagIdStr, DAGClientTimelineImpl.isSupported(), historyLoggingClass, true, true);
    }

    private static void testAtsEnabled(ApplicationId appId, String dagIdStr, boolean expected, String loggingClass, boolean amHistoryLoggingEnabled, boolean dagHistoryLoggingEnabled) throws IOException {
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.set("tez.history.logging.service.class", loggingClass);
        tezConf.setBoolean("tez.am.history.logging.enabled", amHistoryLoggingEnabled);
        tezConf.setBoolean("tez.dag.history.logging.enabled", dagHistoryLoggingEnabled);
        DAGClientImplForTest dagClient = new DAGClientImplForTest(appId, dagIdStr, tezConf, null);
        Assert.assertEquals((Object)expected, (Object)dagClient.getIsATSEnabled());
    }

    private DAGProtos.DAGStatusProto.Builder constructDagStatusProto(DAGProtos.DAGStatusStateProto stateProto) {
        DAGProtos.DAGStatusProto.Builder builder = DAGProtos.DAGStatusProto.newBuilder();
        builder.setState(stateProto);
        return builder;
    }

    private DAGClientAMProtocolBlockingPB createMockProxy(final DAGProtos.DAGStatusStateProto stateProto, final long timeout) throws ServiceException {
        DAGClientAMProtocolBlockingPB mock = (DAGClientAMProtocolBlockingPB)Mockito.mock(DAGClientAMProtocolBlockingPB.class);
        ((DAGClientAMProtocolBlockingPB)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                DAGClientAMProtocolRPC.GetDAGStatusRequestProto request = (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)invocation.getArguments()[1];
                long sleepTime = request.getTimeout();
                if (timeout != -1L) {
                    sleepTime = timeout;
                }
                Thread.sleep(sleepTime);
                return DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(TestDAGClient.this.constructDagStatusProto(stateProto)).build();
            }
        }).when((Object)mock)).getDAGStatus((RpcController)Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto)Mockito.any());
        return mock;
    }

    @Test
    public void testTimelineClientCleanup() throws Exception {
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.set("yarn.http.policy", "HTTPS_ONLY");
        File testDir = new File(System.getProperty("java.io.tmpdir"));
        String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestDAGClient.class);
        KeyStoreTestUtil.setupSSLConfig((String)testDir.getAbsolutePath(), (String)sslConfDir, (Configuration)tezConf, (boolean)false);
        DAGClientTimelineImpl dagClient = new DAGClientTimelineImpl(this.mockAppId, this.dagIdStr, tezConf, (FrameworkClient)Mockito.mock(FrameworkClient.class), 10000);
        Field field = DAGClientTimelineImpl.class.getDeclaredField("timelineReaderStrategy");
        field.setAccessible(true);
        TimelineReaderFactory.TimelineReaderStrategy strategy = (TimelineReaderFactory.TimelineReaderStrategy)field.get(dagClient);
        strategy.getHttpClient();
        ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
        while (threadGroup.getParent() != null) {
            threadGroup = threadGroup.getParent();
        }
        Thread[] threads = new Thread[threadGroup.activeCount()];
        threadGroup.enumerate(threads);
        Thread reloaderThread = null;
        for (Thread thread : threads) {
            if ((thread.getName() == null || !thread.getName().contains("Truststore reloader thread")) && !thread.getName().contains("SSL Certificates Store Monitor")) continue;
            reloaderThread = thread;
        }
        Assert.assertTrue((String)"Reloader is not alive", (boolean)reloaderThread.isAlive());
        dagClient.close();
        boolean reloaderStillAlive = true;
        for (int i = 0; i < 10 && (reloaderStillAlive = reloaderThread.isAlive()); ++i) {
            Thread.sleep(1000L);
        }
        Assert.assertFalse((String)"Reloader is still alive", (boolean)reloaderStillAlive);
    }

    @Test(timeout=50000L)
    public void testGetDagStatusWithCachedStatusExpiration() throws Exception {
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.setLong("tez.dag.status.pollinterval-ms", 800L);
        tezConf.setLong("tez.client.dag.status.cache.timeout-secs", 100000L);
        try (DAGClientImplForTest dagClientImpl = new DAGClientImplForTest(this.mockAppId, this.dagIdStr, tezConf, null);){
            DAGClientRPCImplForTest dagClientRpc = new DAGClientRPCImplForTest(this.mockAppId, this.dagIdStr, tezConf, null);
            dagClientImpl.setRealClient(dagClientRpc);
            DAGStatus rmDagStatus = new DAGStatus((DAGProtos.DAGStatusProtoOrBuilder)this.constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM);
            dagClientImpl.setRmDagStatus(rmDagStatus);
            dagClientRpc.setAMProxy(this.createMockProxy(DAGProtos.DAGStatusStateProto.DAG_RUNNING, -1L));
            long startTime = System.currentTimeMillis();
            DAGStatus dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
            long endTime = System.currentTimeMillis();
            long diff = endTime - startTime;
            Assert.assertTrue((diff > 1500L && diff < 2500L ? 1 : 0) != 0);
            Assert.assertEquals((long)0L, (long)dagClientImpl.numGetStatusViaRmInvocations);
            Assert.assertEquals((long)2L, (long)dagClientRpc.numGetStatusViaAmInvocations);
            Assert.assertEquals((Object)DAGStatus.State.RUNNING, (Object)dagStatus.getState());
            dagClientImpl.resetCounters();
            dagClientRpc.resetCounters();
            rmDagStatus = new DAGStatus((DAGProtos.DAGStatusProtoOrBuilder)this.constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM);
            dagClientImpl.setRmDagStatus(rmDagStatus);
            dagClientRpc.setAMProxy(this.createMockProxy(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
            startTime = System.currentTimeMillis();
            dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
            endTime = System.currentTimeMillis();
            diff = endTime - startTime;
            Assert.assertTrue((String)("diff is " + diff), (diff > 500L && diff < 1500L ? 1 : 0) != 0);
            Assert.assertEquals((long)0L, (long)dagClientImpl.numGetStatusViaRmInvocations);
            Assert.assertEquals((long)1L, (long)dagClientRpc.numGetStatusViaAmInvocations);
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
            DAGStatus cachedDagStatus = dagClientImpl.getCachedDAGStatus();
            Assert.assertNotNull((Object)cachedDagStatus);
            Assert.assertSame((Object)dagStatus, (Object)cachedDagStatus);
            dagClientImpl.resetCounters();
            dagClientRpc.resetCounters();
            dagClientRpc.injectAMFault(new TezException("injected Fault"));
            dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class));
            Assert.assertEquals((long)0L, (long)dagClientImpl.numGetStatusViaRmInvocations);
            Assert.assertEquals((long)1L, (long)dagClientRpc.numGetStatusViaAmInvocations);
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
            Assert.assertSame((Object)dagStatus, (Object)cachedDagStatus);
            dagClientRpc.setAMProxy(this.createMockProxy(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
            dagClientRpc.injectAMFault(new TezException("injected AM Fault"));
            dagClientImpl.resetCounters();
            dagClientRpc.resetCounters();
            dagClientImpl.enforceExpirationCachedDAGStatus();
            dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class));
            Assert.assertEquals((long)1L, (long)dagClientImpl.numGetStatusViaRmInvocations);
            Assert.assertEquals((long)1L, (long)dagClientRpc.numGetStatusViaAmInvocations);
            Assert.assertEquals((Object)DAGStatus.State.RUNNING, (Object)dagStatus.getState());
            Assert.assertNotSame((Object)dagStatus, (Object)cachedDagStatus);
            cachedDagStatus = dagClientImpl.getCachedDAGStatus();
            Assert.assertNull((Object)cachedDagStatus);
            Assert.assertNotNull((Object)dagStatus);
            dagClientImpl.resetCounters();
            dagClientRpc.resetCounters();
            dagClientRpc.setAMProxy(this.createMockProxy(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
            dagClientImpl.injectFault();
            try {
                dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class));
                Assert.fail((String)"The RM should throw IOException");
            }
            catch (IOException ioException) {
                Assert.assertEquals((Object)ioException.getMessage(), (Object)"Fault Injected for RM");
                Assert.assertEquals((long)1L, (long)dagClientImpl.numGetStatusViaRmInvocations);
                Assert.assertEquals((long)1L, (long)dagClientRpc.numGetStatusViaAmInvocations);
            }
        }
    }

    @Test
    public void testDagClientReturnsFailedDAGOnNoCurrentDAGException() throws Exception {
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.setBoolean("tez.dag.recovery.enabled", false);
        try (DAGClientImplForTest dagClientImpl = new DAGClientImplForTest(this.mockAppId, this.dagIdStr, tezConf, null);){
            DAGClientRPCImplForTest dagClientRpc = new DAGClientRPCImplForTest(this.mockAppId, this.dagIdStr, tezConf, null);
            dagClientImpl.setRealClient(dagClientRpc);
            DAGClientAMProtocolBlockingPB mock = (DAGClientAMProtocolBlockingPB)Mockito.mock(DAGClientAMProtocolBlockingPB.class);
            dagClientRpc.setAMProxy(mock);
            dagClientRpc.injectAMFault((TezException)new NoCurrentDAGException("dag_0_0_0"));
            DAGStatus dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
            Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagStatus.getState());
            Assert.assertEquals((Object)"No running DAG at present", dagStatus.getDiagnostics().get(0));
        }
    }

    private static class DAGClientImplForTest
    extends DAGClientImpl {
        private DAGStatus rmDagStatus;
        int numGetStatusViaRmInvocations = 0;
        private volatile boolean faultInjected;

        public DAGClientImplForTest(ApplicationId appId, String dagId, TezConfiguration conf, @Nullable FrameworkClient frameworkClient) throws IOException {
            super(appId, dagId, conf, frameworkClient, UserGroupInformation.getCurrentUser());
        }

        private void setRealClient(DAGClientRPCImplForTest dagClientRpcImplForTest) {
            this.realClient = dagClientRpcImplForTest;
        }

        void setRmDagStatus(DAGStatus rmDagStatus) {
            this.rmDagStatus = rmDagStatus;
        }

        void resetCounters() {
            this.numGetStatusViaRmInvocations = 0;
        }

        protected DAGStatus getDAGStatusViaRM() throws TezException, IOException {
            ++this.numGetStatusViaRmInvocations;
            if (this.faultInjected) {
                throw new IOException("Fault Injected for RM");
            }
            return this.rmDagStatus;
        }

        public boolean getIsATSEnabled() {
            return this.isATSEnabled;
        }

        void injectFault() {
            this.faultInjected = true;
        }

        DAGStatus getCachedDAGStatus() {
            CachedEntity cacheRef = this.getCachedDAGStatusRef();
            return (DAGStatus)cacheRef.getValue();
        }

        void enforceExpirationCachedDAGStatus() {
            this.getCachedDAGStatusRef().enforceExpiration();
        }
    }

    private static class DAGClientRPCImplForTest
    extends DAGClientRPCImpl {
        private AtomicReference<TezException> faultAMInjectedRef = new AtomicReference<Object>(null);
        int numGetStatusViaAmInvocations = 0;

        public DAGClientRPCImplForTest(ApplicationId appId, String dagId, TezConfiguration conf, @Nullable FrameworkClient frameworkClient) throws IOException {
            super(appId, dagId, conf, frameworkClient, UserGroupInformation.getCurrentUser());
        }

        void setAMProxy(DAGClientAMProtocolBlockingPB proxy) {
            this.proxy = proxy;
        }

        void resetCounters() {
            this.numGetStatusViaAmInvocations = 0;
        }

        boolean createAMProxyIfNeeded() throws IOException, TezException {
            return this.proxy != null;
        }

        DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions, long timeout) throws IOException, TezException {
            ++this.numGetStatusViaAmInvocations;
            if (this.faultAMInjectedRef.get() != null) {
                throw this.faultAMInjectedRef.get();
            }
            return super.getDAGStatusViaAM(statusOptions, timeout);
        }

        void injectAMFault(TezException exception) {
            this.faultAMInjectedRef.set(exception);
        }
    }

    private static class VertexCounterRequestMatcher
    implements ArgumentMatcher<DAGClientAMProtocolRPC.GetVertexStatusRequestProto> {
        private VertexCounterRequestMatcher() {
        }

        public boolean matches(DAGClientAMProtocolRPC.GetVertexStatusRequestProto requestProto) {
            return requestProto != null && requestProto.getStatusOptionsCount() != 0 && requestProto.getStatusOptionsList().get(0) == DAGProtos.StatusGetOptsProto.GET_COUNTERS;
        }
    }

    private static class DAGCounterRequestMatcher
    implements ArgumentMatcher<DAGClientAMProtocolRPC.GetDAGStatusRequestProto> {
        private DAGCounterRequestMatcher() {
        }

        public boolean matches(DAGClientAMProtocolRPC.GetDAGStatusRequestProto requestProto) {
            return requestProto != null && requestProto.getStatusOptionsCount() != 0 && requestProto.getStatusOptionsList().get(0) == DAGProtos.StatusGetOptsProto.GET_COUNTERS;
        }
    }
}

