/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow;

import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricStructuredName;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.auth.Credentials;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.TestDataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class TestDataflowRunnerTest {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    @Mock
    private DataflowClient mockClient;
    private TestDataflowPipelineOptions options;

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        this.options = (TestDataflowPipelineOptions)PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
        this.options.setAppName("TestAppName");
        this.options.setProject("test-project");
        this.options.setRegion("some-region1");
        this.options.setTempLocation("gs://test/temp/location");
        this.options.setTempRoot("gs://test");
        this.options.setGcpCredential((Credentials)new TestCredential());
        this.options.setRunner(TestDataflowRunner.class);
        this.options.setPathValidatorClass(NoopPathValidator.class);
    }

    @Test
    public void testToString() {
        Assert.assertEquals((Object)"TestDataflowRunner#TestAppName", (Object)TestDataflowRunner.fromOptions((PipelineOptions)this.options).toString());
    }

    @Test
    public void testRunBatchJobThatSucceeds() throws Exception {
        Pipeline p = Pipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(true, true));
        Assert.assertEquals((Object)mockJob, (Object)runner.run(p, mockRunner));
    }

    @Test
    public void testRunBatchJobThatSucceedsDespiteTransientErrors() throws Exception {
        Pipeline p = Pipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        Mockito.when((Object)mockJob.waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenAnswer(invocation -> {
            JobMessage message = new JobMessage();
            message.setMessageText("TransientError");
            message.setTime(TimeUtil.toCloudTime((ReadableInstant)Instant.now()));
            message.setMessageImportance("JOB_MESSAGE_ERROR");
            ((MonitoringUtil.JobMessagesHandler)invocation.getArguments()[1]).process(Arrays.asList(message));
            return PipelineResult.State.DONE;
        });
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(true, true));
        Assert.assertEquals((Object)mockJob, (Object)runner.run(p, mockRunner));
    }

    @Test
    public void testRunBatchJobThatFails() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.FAILED);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(true, false));
        this.expectedException.expect(RuntimeException.class);
        runner.run(p, mockRunner);
        Assert.fail((String)"AssertionError expected");
    }

    @Test
    public void testBatchPipelineFailsIfException() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.RUNNING);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        Mockito.when((Object)mockJob.waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenAnswer(invocation -> {
            JobMessage message = new JobMessage();
            message.setMessageText("FooException");
            message.setTime(TimeUtil.toCloudTime((ReadableInstant)Instant.now()));
            message.setMessageImportance("JOB_MESSAGE_ERROR");
            ((MonitoringUtil.JobMessagesHandler)invocation.getArguments()[1]).process(Arrays.asList(message));
            return PipelineResult.State.CANCELLED;
        });
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(false, true));
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        try {
            runner.run(p, mockRunner);
        }
        catch (AssertionError expected) {
            MatcherAssert.assertThat((Object)((Throwable)((Object)expected)).getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"FooException"));
            ((DataflowPipelineJob)Mockito.verify((Object)mockJob, (VerificationMode)Mockito.never())).cancel();
            return;
        }
        Assert.fail((String)"AssertionError expected");
    }

    @Test
    public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(true, true));
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        runner.run(p, mockRunner);
    }

    @Test
    public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockStreamingMetricResponse((Map<String, BigDecimal>)ImmutableMap.of()));
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        runner.run(p, mockRunner);
    }

    @Test
    public void testRunStreamingJobThatFails() throws Exception {
        this.testStreamingPipelineFailsIfException();
    }

    private JobMetrics generateMockMetricResponse(boolean success, boolean tentative) throws Exception {
        List<MetricUpdate> metrics = this.generateMockMetrics(success, tentative);
        return this.buildJobMetrics(metrics);
    }

    private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative) {
        MetricStructuredName name = new MetricStructuredName();
        name.setName(success ? "PAssertSuccess" : "PAssertFailure");
        name.setContext((Map)(tentative ? ImmutableMap.of((Object)"tentative", (Object)"") : ImmutableMap.of()));
        MetricUpdate metric = new MetricUpdate();
        metric.setName(name);
        metric.setScalar((Object)BigDecimal.ONE);
        return Lists.newArrayList((Object[])new MetricUpdate[]{metric});
    }

    private JobMetrics generateMockStreamingMetricResponse(Map<String, BigDecimal> metricMap) throws IOException {
        return this.buildJobMetrics(this.generateMockStreamingMetrics(metricMap));
    }

    private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal> metricMap) {
        ArrayList metrics = Lists.newArrayList();
        for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
            MetricStructuredName name = new MetricStructuredName();
            name.setName(entry.getKey());
            MetricUpdate metric = new MetricUpdate();
            metric.setName(name);
            metric.setScalar((Object)entry.getValue());
            metrics.add(metric);
        }
        return metrics;
    }

    private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
        JobMetrics jobMetrics = new JobMetrics();
        jobMetrics.setMetrics(metricList);
        jobMetrics.setFactory(Transport.getJsonFactory());
        return jobMetrics;
    }

    @Test
    public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
        DataflowPipelineJob job = (DataflowPipelineJob)Mockito.spy((Object)new DataflowPipelineJob(this.mockClient, "test-job", (DataflowPipelineOptions)this.options, null));
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.buildJobMetrics(this.generateMockMetrics(true, true)));
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        ((DataflowPipelineJob)Mockito.doReturn((Object)PipelineResult.State.DONE).when((Object)job)).getState();
        MatcherAssert.assertThat((Object)runner.checkForPAssertSuccess(job), (Matcher)org.hamcrest.Matchers.equalTo((Object)Optional.of((Object)true)));
    }

    @Test
    public void testCheckingForSuccessWhenPAssertFails() throws Exception {
        DataflowPipelineJob job = (DataflowPipelineJob)Mockito.spy((Object)new DataflowPipelineJob(this.mockClient, "test-job", (DataflowPipelineOptions)this.options, null));
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.buildJobMetrics(this.generateMockMetrics(false, true)));
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        ((DataflowPipelineJob)Mockito.doReturn((Object)PipelineResult.State.DONE).when((Object)job)).getState();
        MatcherAssert.assertThat((Object)runner.checkForPAssertSuccess(job), (Matcher)org.hamcrest.Matchers.equalTo((Object)Optional.of((Object)false)));
    }

    @Test
    public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
        DataflowPipelineJob job = (DataflowPipelineJob)Mockito.spy((Object)new DataflowPipelineJob(this.mockClient, "test-job", (DataflowPipelineOptions)this.options, null));
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.buildJobMetrics(this.generateMockMetrics(true, false)));
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        runner.updatePAssertCount(p);
        ((DataflowPipelineJob)Mockito.doReturn((Object)PipelineResult.State.RUNNING).when((Object)job)).getState();
        MatcherAssert.assertThat((Object)runner.checkForPAssertSuccess(job), (Matcher)org.hamcrest.Matchers.equalTo((Object)Optional.absent()));
    }

    @Test
    public void testStreamingPipelineFailsIfException() throws Exception {
        this.options.setStreaming(true);
        Pipeline pipeline = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)pipeline.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.RUNNING);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        Mockito.when((Object)mockJob.waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenAnswer(invocation -> {
            JobMessage message = new JobMessage();
            message.setMessageText("FooException");
            message.setTime(TimeUtil.toCloudTime((ReadableInstant)Instant.now()));
            message.setMessageImportance("JOB_MESSAGE_ERROR");
            ((MonitoringUtil.JobMessagesHandler)invocation.getArguments()[1]).process(Arrays.asList(message));
            return PipelineResult.State.CANCELLED;
        });
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(false, true));
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        this.expectedException.expect(RuntimeException.class);
        runner.run(pipeline, mockRunner);
    }

    @Test
    public void testGetJobMetricsThatSucceeds() throws Exception {
        DataflowPipelineJob job = (DataflowPipelineJob)Mockito.spy((Object)new DataflowPipelineJob(this.mockClient, "test-job", (DataflowPipelineOptions)this.options, null));
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(true, true));
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        JobMetrics metrics = runner.getJobMetrics(job);
        Assert.assertEquals((long)1L, (long)metrics.getMetrics().size());
        Assert.assertEquals(this.generateMockMetrics(true, true), (Object)metrics.getMetrics());
    }

    @Test
    public void testGetJobMetricsThatFailsForException() throws Exception {
        DataflowPipelineJob job = (DataflowPipelineJob)Mockito.spy((Object)new DataflowPipelineJob(this.mockClient, "test-job", (DataflowPipelineOptions)this.options, null));
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenThrow(new Throwable[]{new IOException()});
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        Assert.assertNull((Object)runner.getJobMetrics(job));
    }

    @Test
    public void testBatchOnCreateMatcher() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        ((TestPipelineOptions)this.options.as(TestPipelineOptions.class)).setOnCreateMatcher((SerializableMatcher)new TestSuccessMatcher(mockJob, 0));
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(true, true));
        runner.run(p, mockRunner);
    }

    @Test
    public void testStreamingOnCreateMatcher() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        ((TestPipelineOptions)this.options.as(TestPipelineOptions.class)).setOnCreateMatcher((SerializableMatcher)new TestSuccessMatcher(mockJob, 0));
        Mockito.when((Object)mockJob.waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(true, true));
        runner.run(p, mockRunner);
    }

    @Test
    public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        ((TestPipelineOptions)this.options.as(TestPipelineOptions.class)).setOnSuccessMatcher((SerializableMatcher)new TestSuccessMatcher(mockJob, 1));
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(true, true));
        runner.run(p, mockRunner);
    }

    @Test
    public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        ((TestPipelineOptions)this.options.as(TestPipelineOptions.class)).setOnSuccessMatcher((SerializableMatcher)new TestSuccessMatcher(mockJob, 1));
        Mockito.when((Object)mockJob.waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(true, true));
        runner.run(p, mockRunner);
    }

    @Test
    public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.FAILED);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        ((TestPipelineOptions)this.options.as(TestPipelineOptions.class)).setOnSuccessMatcher((SerializableMatcher)new TestFailureMatcher());
        Mockito.when((Object)this.mockClient.getJobMetrics(ArgumentMatchers.anyString())).thenReturn((Object)this.generateMockMetricResponse(false, true));
        try {
            runner.run(p, mockRunner);
        }
        catch (AssertionError expected) {
            ((DataflowPipelineJob)Mockito.verify((Object)mockJob, (VerificationMode)Mockito.times((int)1))).waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class));
            return;
        }
        Assert.fail((String)"Expected an exception on pipeline failure.");
    }

    @Test
    public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.FAILED);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowRunner mockRunner = (DataflowRunner)Mockito.mock(DataflowRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient((TestDataflowPipelineOptions)this.options, (DataflowClient)this.mockClient);
        ((TestPipelineOptions)this.options.as(TestPipelineOptions.class)).setOnSuccessMatcher((SerializableMatcher)new TestFailureMatcher());
        Mockito.when((Object)mockJob.waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenReturn((Object)PipelineResult.State.FAILED);
        this.expectedException.expect(RuntimeException.class);
        runner.run(p, mockRunner);
    }

    static class TestFailureMatcher
    extends BaseMatcher<PipelineResult>
    implements SerializableMatcher<PipelineResult> {
        TestFailureMatcher() {
        }

        public boolean matches(Object o) {
            Assert.fail((String)"OnSuccessMatcher should not be called on pipeline failure.");
            return false;
        }

        public void describeTo(Description description) {
        }
    }

    static class TestSuccessMatcher
    extends BaseMatcher<PipelineResult>
    implements SerializableMatcher<PipelineResult> {
        private final transient DataflowPipelineJob mockJob;
        private final int called;

        public TestSuccessMatcher(DataflowPipelineJob job, int times) {
            this.mockJob = job;
            this.called = times;
        }

        public boolean matches(Object o) {
            if (!(o instanceof PipelineResult)) {
                Assert.fail((String)String.format("Expected PipelineResult but received %s", o));
            }
            try {
                ((DataflowPipelineJob)Mockito.verify((Object)this.mockJob, (VerificationMode)Mockito.times((int)this.called))).waitUntilFinish((Duration)Matchers.any(Duration.class), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class));
            }
            catch (IOException | InterruptedException e) {
                throw new AssertionError((Object)e);
            }
            Assert.assertSame((Object)this.mockJob, (Object)o);
            return true;
        }

        public void describeTo(Description description) {
        }
    }
}

