/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow;

import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.ListJobMessagesResponse;
import com.google.auth.Credentials;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class DataflowPipelineJobTest {
    private static final String PROJECT_ID = "some-project";
    private static final String REGION_ID = "some-region-2b";
    private static final String JOB_ID = "1234";
    private static final String REPLACEMENT_JOB_ID = "4321";
    @Mock
    private DataflowClient mockDataflowClient;
    @Mock
    private Dataflow mockWorkflowClient;
    @Mock
    private Dataflow.Projects mockProjects;
    @Mock
    private Dataflow.Projects.Locations mockLocations;
    @Mock
    private Dataflow.Projects.Locations.Jobs mockJobs;
    @Mock
    private MonitoringUtil.JobMessagesHandler mockHandler;
    @Rule
    public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    @Rule
    public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowPipelineJob.class);
    private TestDataflowPipelineOptions options;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.mockWorkflowClient.projects()).thenReturn((Object)this.mockProjects);
        Mockito.when((Object)this.mockProjects.locations()).thenReturn((Object)this.mockLocations);
        Mockito.when((Object)this.mockLocations.jobs()).thenReturn((Object)this.mockJobs);
        this.options = (TestDataflowPipelineOptions)PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
        this.options.setDataflowClient(this.mockWorkflowClient);
        this.options.setProject(PROJECT_ID);
        this.options.setRegion(REGION_ID);
        this.options.setRunner(DataflowRunner.class);
        this.options.setTempLocation("gs://fakebucket/temp");
        this.options.setPathValidatorClass(NoopPathValidator.class);
        this.options.setGcpCredential((Credentials)new TestCredential());
    }

    private void checkValidInterval(Duration pollingInterval, int retries, long timeSleptMillis) {
        long highSum = 0L;
        long lowSum = 0L;
        for (int i = 0; i < retries; ++i) {
            double currentInterval = (double)pollingInterval.getMillis() * Math.pow(1.5, i);
            double randomOffset = 0.5 * currentInterval;
            highSum += Math.round(currentInterval + randomOffset);
            lowSum += Math.round(currentInterval - randomOffset);
        }
        MatcherAssert.assertThat((Object)timeSleptMillis, (Matcher)org.hamcrest.Matchers.allOf((Matcher)org.hamcrest.Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(lowSum)), (Matcher)org.hamcrest.Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(highSum))));
    }

    @Test
    public void testWaitToFinishMessagesFail() throws Exception {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Job statusResponse = new Job();
        statusResponse.setCurrentState("JOB_STATE_" + PipelineResult.State.DONE.name());
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenReturn((Object)statusResponse);
        MonitoringUtil.JobMessagesHandler jobHandler = (MonitoringUtil.JobMessagesHandler)Mockito.mock(MonitoringUtil.JobMessagesHandler.class);
        Dataflow.Projects.Locations.Jobs.Messages mockMessages = (Dataflow.Projects.Locations.Jobs.Messages)Mockito.mock(Dataflow.Projects.Locations.Jobs.Messages.class);
        Dataflow.Projects.Locations.Jobs.Messages.List listRequest = (Dataflow.Projects.Locations.Jobs.Messages.List)Mockito.mock(Dataflow.Projects.Locations.Jobs.Messages.List.class);
        Mockito.when((Object)this.mockJobs.messages()).thenReturn((Object)mockMessages);
        Mockito.when((Object)mockMessages.list((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)listRequest);
        Mockito.when((Object)listRequest.setPageToken((String)Matchers.eq((Object)null))).thenReturn((Object)listRequest);
        Mockito.when((Object)((ListJobMessagesResponse)listRequest.execute())).thenThrow(SocketTimeoutException.class);
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, (Map)ImmutableMap.of());
        PipelineResult.State state = job.waitUntilFinish(Duration.standardMinutes((long)5L), jobHandler, arg_0 -> ((FastNanoClockAndSleeper)this.fastClock).sleep(arg_0), () -> ((FastNanoClockAndSleeper)this.fastClock).nanoTime());
        Assert.assertEquals(null, (Object)state);
    }

    public PipelineResult.State mockWaitToFinishInState(PipelineResult.State state) throws Exception {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Job statusResponse = new Job();
        statusResponse.setCurrentState("JOB_STATE_" + state.name());
        if (state == PipelineResult.State.UPDATED) {
            statusResponse.setReplacedByJobId(REPLACEMENT_JOB_ID);
        }
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenReturn((Object)statusResponse);
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, (Map)ImmutableMap.of());
        return job.waitUntilFinish(Duration.standardMinutes((long)1L), null, arg_0 -> ((FastNanoClockAndSleeper)this.fastClock).sleep(arg_0), () -> ((FastNanoClockAndSleeper)this.fastClock).nanoTime());
    }

    @Test
    public void testWaitToFinishDone() throws Exception {
        Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)this.mockWaitToFinishInState(PipelineResult.State.DONE));
        this.expectedLogs.verifyInfo(String.format("Job %s finished with status DONE.", JOB_ID));
    }

    @Test
    public void testWaitToFinishFailed() throws Exception {
        Assert.assertEquals((Object)PipelineResult.State.FAILED, (Object)this.mockWaitToFinishInState(PipelineResult.State.FAILED));
        this.expectedLogs.verifyInfo(String.format("Job %s failed with status FAILED.", JOB_ID));
    }

    @Test
    public void testWaitToFinishCancelled() throws Exception {
        Assert.assertEquals((Object)PipelineResult.State.CANCELLED, (Object)this.mockWaitToFinishInState(PipelineResult.State.CANCELLED));
        this.expectedLogs.verifyInfo(String.format("Job %s finished with status CANCELLED", JOB_ID));
    }

    @Test
    public void testWaitToFinishUpdated() throws Exception {
        Assert.assertEquals((Object)PipelineResult.State.UPDATED, (Object)this.mockWaitToFinishInState(PipelineResult.State.UPDATED));
        this.expectedLogs.verifyInfo(String.format("Job %s has been updated and is running as the new job with id %s.", JOB_ID, REPLACEMENT_JOB_ID));
    }

    @Test
    public void testWaitToFinishLogsError() throws Exception {
        Assert.assertEquals((Object)PipelineResult.State.UPDATED, (Object)this.mockWaitToFinishInState(PipelineResult.State.UPDATED));
        this.expectedLogs.verifyInfo(String.format("Job %s has been updated and is running as the new job with id %s.", JOB_ID, REPLACEMENT_JOB_ID));
    }

    @Test
    public void testWaitToFinishUnknown() throws Exception {
        Assert.assertEquals(null, (Object)this.mockWaitToFinishInState(PipelineResult.State.UNKNOWN));
        this.expectedLogs.verifyWarn("No terminal state was returned within allotted timeout. State value UNKNOWN");
    }

    @Test
    public void testWaitToFinishFail() throws Exception {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenThrow(IOException.class);
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, (Map)ImmutableMap.of());
        long startTime = this.fastClock.nanoTime();
        PipelineResult.State state = job.waitUntilFinish(Duration.standardMinutes((long)5L), null, arg_0 -> ((FastNanoClockAndSleeper)this.fastClock).sleep(arg_0), () -> ((FastNanoClockAndSleeper)this.fastClock).nanoTime());
        Assert.assertEquals(null, (Object)state);
        long timeDiff = TimeUnit.NANOSECONDS.toMillis(this.fastClock.nanoTime() - startTime);
        this.checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL, 11, timeDiff);
    }

    @Test
    public void testWaitToFinishTimeFail() throws Exception {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenThrow(IOException.class);
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, (Map)ImmutableMap.of());
        long startTime = this.fastClock.nanoTime();
        PipelineResult.State state = job.waitUntilFinish(Duration.millis((long)4L), null, arg_0 -> ((FastNanoClockAndSleeper)this.fastClock).sleep(arg_0), () -> ((FastNanoClockAndSleeper)this.fastClock).nanoTime());
        Assert.assertEquals(null, (Object)state);
        long timeDiff = TimeUnit.NANOSECONDS.toMillis(this.fastClock.nanoTime() - startTime);
        Assert.assertEquals((long)4L, (long)timeDiff);
    }

    @Test
    public void testCumulativeTimeOverflow() throws Exception {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Job statusResponse = new Job();
        statusResponse.setCurrentState("JOB_STATE_RUNNING");
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenReturn((Object)statusResponse);
        FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, (Map)ImmutableMap.of());
        long startTime = clock.nanoTime();
        PipelineResult.State state = job.waitUntilFinish(Duration.millis((long)4L), null, (Sleeper)clock, (NanoClock)clock);
        Assert.assertEquals(null, (Object)state);
        long timeDiff = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - startTime);
        MatcherAssert.assertThat((Object)timeDiff, (Matcher)org.hamcrest.Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(4L)));
    }

    @Test
    public void testGetStateReturnsServiceState() throws Exception {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Job statusResponse = new Job();
        statusResponse.setCurrentState("JOB_STATE_" + PipelineResult.State.RUNNING.name());
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenReturn((Object)statusResponse);
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, (Map)ImmutableMap.of());
        Assert.assertEquals((Object)PipelineResult.State.RUNNING, (Object)job.getStateWithRetriesOrUnknownOnException(BackOffAdapter.toGcpBackOff((BackOff)DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()), arg_0 -> ((FastNanoClockAndSleeper)this.fastClock).sleep(arg_0)));
    }

    @Test
    public void testGetStateWithRetriesPassesExceptionThrough() throws Exception {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenThrow(IOException.class);
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, (Map)ImmutableMap.of());
        this.thrown.expect(IOException.class);
        job.getStateWithRetries(BackOffAdapter.toGcpBackOff((BackOff)DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()), arg_0 -> ((FastNanoClockAndSleeper)this.fastClock).sleep(arg_0));
    }

    @Test
    public void testGetStateNoThrowWithExceptionReturnsUnknown() throws Exception {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenThrow(IOException.class);
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, (Map)ImmutableMap.of());
        long startTime = this.fastClock.nanoTime();
        Assert.assertEquals((Object)PipelineResult.State.UNKNOWN, (Object)job.getStateWithRetriesOrUnknownOnException(BackOffAdapter.toGcpBackOff((BackOff)DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()), arg_0 -> ((FastNanoClockAndSleeper)this.fastClock).sleep(arg_0)));
        long timeDiff = TimeUnit.NANOSECONDS.toMillis(this.fastClock.nanoTime() - startTime);
        this.checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL, 4, timeDiff);
    }

    @Test
    public void testCancelUnterminatedJobThatSucceeds() throws IOException {
        Dataflow.Projects.Locations.Jobs.Update update = (Dataflow.Projects.Locations.Jobs.Update)Mockito.mock(Dataflow.Projects.Locations.Jobs.Update.class);
        Mockito.when((Object)this.mockJobs.update((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID), (Job)Mockito.any(Job.class))).thenReturn((Object)update);
        Mockito.when((Object)((Job)update.execute())).thenReturn((Object)new Job().setCurrentState("JOB_STATE_CANCELLED"));
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, null);
        Assert.assertEquals((Object)PipelineResult.State.CANCELLED, (Object)job.cancel());
        Job content = new Job();
        content.setProjectId(PROJECT_ID);
        content.setId(JOB_ID);
        content.setRequestedState("JOB_STATE_CANCELLED");
        ((Dataflow.Projects.Locations.Jobs)Mockito.verify((Object)this.mockJobs)).update((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID), (Job)Matchers.eq((Object)content));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockJobs});
    }

    @Test
    public void testCancelUnterminatedJobThatFails() throws IOException {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Job statusResponse = new Job();
        statusResponse.setCurrentState("JOB_STATE_RUNNING");
        Mockito.when((Object)this.mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenReturn((Object)statusResponse);
        Dataflow.Projects.Locations.Jobs.Update update = (Dataflow.Projects.Locations.Jobs.Update)Mockito.mock(Dataflow.Projects.Locations.Jobs.Update.class);
        Mockito.when((Object)this.mockJobs.update((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID), (Job)Mockito.any(Job.class))).thenReturn((Object)update);
        Mockito.when((Object)((Job)update.execute())).thenThrow(new Throwable[]{new IOException("Some random IOException")});
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, null);
        this.thrown.expect(IOException.class);
        this.thrown.expectMessage("Failed to cancel job in state RUNNING, please go to the Developers Console to cancel it manually:");
        job.cancel();
    }

    @Test
    public void testCancelTerminatedJobWithStaleState() throws IOException {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Job statusResponse = new Job();
        statusResponse.setCurrentState("JOB_STATE_RUNNING");
        Mockito.when((Object)this.mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenReturn((Object)statusResponse);
        Dataflow.Projects.Locations.Jobs.Update update = (Dataflow.Projects.Locations.Jobs.Update)Mockito.mock(Dataflow.Projects.Locations.Jobs.Update.class);
        Mockito.when((Object)this.mockJobs.update((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID), (Job)Mockito.any(Job.class))).thenReturn((Object)update);
        Mockito.when((Object)((Job)update.execute())).thenThrow(new Throwable[]{new IOException("Job has terminated in state SUCCESS")});
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, null);
        PipelineResult.State returned = job.cancel();
        MatcherAssert.assertThat((Object)returned, (Matcher)org.hamcrest.Matchers.equalTo((Object)PipelineResult.State.RUNNING));
        this.expectedLogs.verifyWarn("Cancel failed because job is already terminated.");
    }

    @Test
    public void testCancelTerminatedJob() throws IOException {
        Dataflow.Projects.Locations.Jobs.Get statusRequest = (Dataflow.Projects.Locations.Jobs.Get)Mockito.mock(Dataflow.Projects.Locations.Jobs.Get.class);
        Job statusResponse = new Job();
        statusResponse.setCurrentState("JOB_STATE_FAILED");
        Mockito.when((Object)this.mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn((Object)statusRequest);
        Mockito.when((Object)((Job)statusRequest.execute())).thenReturn((Object)statusResponse);
        Dataflow.Projects.Locations.Jobs.Update update = (Dataflow.Projects.Locations.Jobs.Update)Mockito.mock(Dataflow.Projects.Locations.Jobs.Update.class);
        Mockito.when((Object)this.mockJobs.update((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID), (Job)Mockito.any(Job.class))).thenReturn((Object)update);
        Mockito.when((Object)((Job)update.execute())).thenThrow(new Throwable[]{new IOException()});
        DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create((DataflowPipelineOptions)this.options), JOB_ID, (DataflowPipelineOptions)this.options, null);
        Assert.assertEquals((Object)PipelineResult.State.FAILED, (Object)job.cancel());
        Job content = new Job();
        content.setProjectId(PROJECT_ID);
        content.setId(JOB_ID);
        content.setRequestedState("JOB_STATE_CANCELLED");
        ((Dataflow.Projects.Locations.Jobs)Mockito.verify((Object)this.mockJobs)).update((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)REGION_ID), (String)Matchers.eq((Object)JOB_ID), (Job)Matchers.eq((Object)content));
        ((Dataflow.Projects.Locations.Jobs)Mockito.verify((Object)this.mockJobs)).get(PROJECT_ID, REGION_ID, JOB_ID);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockJobs});
    }

    @Test
    public void testWaitUntilFinishNoRepeatedLogs() throws Exception {
        DataflowPipelineJob job = new DataflowPipelineJob(this.mockDataflowClient, JOB_ID, (DataflowPipelineOptions)this.options, null);
        ZeroSleeper sleeper = new ZeroSleeper();
        NanoClock nanoClock = (NanoClock)Mockito.mock(NanoClock.class);
        Instant separatingTimestamp = new Instant(42L);
        JobMessage theMessage = DataflowPipelineJobTest.infoMessage(separatingTimestamp, "nothing");
        MonitoringUtil mockMonitor = (MonitoringUtil)Mockito.mock(MonitoringUtil.class);
        Mockito.when((Object)mockMonitor.getJobMessages(ArgumentMatchers.anyString(), Matchers.anyLong())).thenReturn((Object)ImmutableList.of((Object)theMessage));
        Job fakeJob = new Job();
        fakeJob.setCurrentState("JOB_STATE_RUNNING");
        Mockito.when((Object)this.mockDataflowClient.getJob(ArgumentMatchers.anyString())).thenReturn((Object)fakeJob);
        Mockito.when((Object)nanoClock.nanoTime()).thenReturn((Object)0L).thenReturn((Object)2000000000L);
        job.waitUntilFinish(Duration.standardSeconds((long)1L), this.mockHandler, (Sleeper)sleeper, nanoClock, mockMonitor);
        ((MonitoringUtil.JobMessagesHandler)Mockito.verify((Object)this.mockHandler)).process((List)ImmutableList.of((Object)theMessage));
        Mockito.when((Object)nanoClock.nanoTime()).thenReturn((Object)3000000000L).thenReturn((Object)6000000000L);
        job.waitUntilFinish(Duration.standardSeconds((long)1L), this.mockHandler, (Sleeper)sleeper, nanoClock, mockMonitor);
        ((MonitoringUtil)Mockito.verify((Object)mockMonitor)).getJobMessages(ArgumentMatchers.anyString(), Matchers.eq((long)separatingTimestamp.getMillis()));
    }

    private static JobMessage infoMessage(Instant timestamp, String text) {
        JobMessage message = new JobMessage();
        message.setTime(TimeUtil.toCloudTime((ReadableInstant)timestamp));
        message.setMessageText(text);
        return message;
    }

    private static class ZeroSleeper
    implements Sleeper {
        private ZeroSleeper() {
        }

        public void sleep(long l) throws InterruptedException {
        }
    }

    private static class FastNanoClockAndFuzzySleeper
    implements NanoClock,
    Sleeper {
        private long fastNanoTime = NanoClock.SYSTEM.nanoTime();

        public long nanoTime() {
            return this.fastNanoTime;
        }

        public void sleep(long millis) throws InterruptedException {
            this.fastNanoTime += millis * 1000000L + (long)ThreadLocalRandom.current().nextInt(500000);
        }
    }
}

