/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow;

import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Joiner;
import org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Optional;
import org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Strings;
import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDataflowRunner
extends PipelineRunner<DataflowPipelineJob> {
    private static final String TENTATIVE_COUNTER = "tentative";
    private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
    private final TestDataflowPipelineOptions options;
    private final DataflowClient dataflowClient;
    private final DataflowRunner runner;
    private int expectedNumberOfAssertions = 0;

    TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) {
        this.options = options;
        this.dataflowClient = client;
        this.runner = DataflowRunner.fromOptions(options);
    }

    public static TestDataflowRunner fromOptions(PipelineOptions options) {
        TestDataflowPipelineOptions dataflowOptions = (TestDataflowPipelineOptions)options.as(TestDataflowPipelineOptions.class);
        String tempLocation = Joiner.on("/").join(dataflowOptions.getTempRoot(), dataflowOptions.getJobName(), "output", "results");
        dataflowOptions.setTempLocation(tempLocation);
        return new TestDataflowRunner(dataflowOptions, DataflowClient.create((DataflowPipelineOptions)options.as(DataflowPipelineOptions.class)));
    }

    @VisibleForTesting
    static TestDataflowRunner fromOptionsAndClient(TestDataflowPipelineOptions options, DataflowClient client) {
        return new TestDataflowRunner(options, client);
    }

    public DataflowPipelineJob run(Pipeline pipeline) {
        return this.run(pipeline, this.runner);
    }

    DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
        Optional<Object> allAssertionsPassed;
        Boolean jobSuccess;
        this.updatePAssertCount(pipeline);
        TestPipelineOptions testPipelineOptions = (TestPipelineOptions)this.options.as(TestPipelineOptions.class);
        DataflowPipelineJob job = runner.run(pipeline);
        LOG.info("Running Dataflow job {} with {} expected assertions.", (Object)job.getJobId(), (Object)this.expectedNumberOfAssertions);
        MatcherAssert.assertThat((Object)job, (Matcher)testPipelineOptions.getOnCreateMatcher());
        ErrorMonitorMessagesHandler messageHandler = new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());
        if (this.options.isStreaming()) {
            jobSuccess = this.waitForStreamingJobTermination(job, messageHandler);
            allAssertionsPassed = Optional.absent();
        } else {
            jobSuccess = this.waitForBatchJobTermination(job, messageHandler);
            allAssertionsPassed = this.checkForPAssertSuccess(job);
        }
        if (!allAssertionsPassed.isPresent()) {
            LOG.warn("Dataflow job {} did not output a success or failure metric.", (Object)job.getJobId());
        } else if (!((Boolean)allAssertionsPassed.get()).booleanValue()) {
            throw new AssertionError((Object)TestDataflowRunner.errorMessage(job, messageHandler));
        }
        if (!jobSuccess.booleanValue()) {
            throw new RuntimeException(TestDataflowRunner.errorMessage(job, messageHandler));
        }
        MatcherAssert.assertThat((Object)job, (Matcher)testPipelineOptions.getOnSuccessMatcher());
        return job;
    }

    private boolean waitForStreamingJobTermination(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
        PipelineResult.State finalState;
        this.options.getExecutorService().submit(new CancelOnError(job, messageHandler));
        try {
            finalState = job.waitUntilFinish(Duration.standardSeconds((long)this.options.getTestTimeoutSeconds()), messageHandler);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            return false;
        }
        if (finalState == null || !finalState.isTerminal()) {
            LOG.info("Dataflow job {} took longer than {} seconds to complete, cancelling.", (Object)job.getJobId(), (Object)this.options.getTestTimeoutSeconds());
            try {
                job.cancel();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return false;
        }
        return finalState == PipelineResult.State.DONE && !messageHandler.hasSeenError();
    }

    private boolean waitForBatchJobTermination(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
        try {
            job.waitUntilFinish(Duration.standardSeconds((long)-1L), messageHandler);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            return false;
        }
        return job.getState() == PipelineResult.State.DONE;
    }

    private static String errorMessage(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
        return Strings.isNullOrEmpty(messageHandler.getErrorMessage()) ? String.format("Dataflow job %s terminated in state %s but did not return a failure reason.", job.getJobId(), job.getState()) : messageHandler.getErrorMessage();
    }

    @VisibleForTesting
    void updatePAssertCount(Pipeline pipeline) {
        this.expectedNumberOfAssertions = ExperimentalOptions.hasExperiment((PipelineOptions)this.options, (String)"beam_fn_api") ? 0 : PAssert.countAsserts((Pipeline)pipeline);
    }

    @VisibleForTesting
    Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) {
        JobMetrics metrics = this.getJobMetrics(job);
        if (metrics == null || metrics.getMetrics() == null) {
            LOG.warn("Metrics not present for Dataflow job {}.", (Object)job.getJobId());
            return Optional.absent();
        }
        int successes = 0;
        int failures = 0;
        for (MetricUpdate metric : metrics.getMetrics()) {
            if (metric.getName() == null || metric.getName().getContext() == null || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) continue;
            if ("PAssertSuccess".equals(metric.getName().getName())) {
                successes += ((BigDecimal)metric.getScalar()).intValue();
                continue;
            }
            if (!"PAssertFailure".equals(metric.getName().getName())) continue;
            failures += ((BigDecimal)metric.getScalar()).intValue();
        }
        if (failures > 0) {
            LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{job.getJobId(), successes, failures, this.expectedNumberOfAssertions});
            return Optional.of(false);
        }
        if (successes >= this.expectedNumberOfAssertions) {
            LOG.info("Success result for Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{job.getJobId(), successes, failures, this.expectedNumberOfAssertions});
            return Optional.of(true);
        }
        PipelineResult.State state = job.getState();
        if (state == PipelineResult.State.FAILED || state == PipelineResult.State.CANCELLED) {
            LOG.info("Dataflow job {} terminated in failure state {} without reporting a failed assertion", (Object)job.getJobId(), (Object)state);
            return Optional.absent();
        }
        LOG.info("Inconclusive results for Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{job.getJobId(), successes, failures, this.expectedNumberOfAssertions});
        return Optional.absent();
    }

    @Nullable
    @VisibleForTesting
    JobMetrics getJobMetrics(DataflowPipelineJob job) {
        JobMetrics metrics = null;
        try {
            metrics = this.dataflowClient.getJobMetrics(job.getJobId());
        }
        catch (IOException e) {
            LOG.warn("Failed to get job metrics: ", (Throwable)e);
        }
        return metrics;
    }

    public String toString() {
        return "TestDataflowRunner#" + this.options.getAppName();
    }

    private static class CancelOnError
    implements Callable<Void> {
        private final DataflowPipelineJob job;
        private final ErrorMonitorMessagesHandler messageHandler;

        public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
            this.job = job;
            this.messageHandler = messageHandler;
        }

        @Override
        public Void call() throws Exception {
            while (true) {
                PipelineResult.State jobState = this.job.getState();
                if (this.messageHandler.hasSeenError() && !this.job.getState().isTerminal()) {
                    this.job.cancel();
                    LOG.info("Cancelling Dataflow job {}", (Object)this.job.getJobId());
                    return null;
                }
                if (jobState.isTerminal()) {
                    return null;
                }
                Thread.sleep(3000L);
            }
        }
    }

    private static class ErrorMonitorMessagesHandler
    implements MonitoringUtil.JobMessagesHandler {
        private final DataflowPipelineJob job;
        private final MonitoringUtil.JobMessagesHandler messageHandler;
        private final StringBuffer errorMessage;
        private volatile boolean hasSeenError;

        private ErrorMonitorMessagesHandler(DataflowPipelineJob job, MonitoringUtil.JobMessagesHandler messageHandler) {
            this.job = job;
            this.messageHandler = messageHandler;
            this.errorMessage = new StringBuffer();
            this.hasSeenError = false;
        }

        @Override
        public void process(List<JobMessage> messages) {
            this.messageHandler.process(messages);
            for (JobMessage message : messages) {
                if (!"JOB_MESSAGE_ERROR".equals(message.getMessageImportance())) continue;
                LOG.info("Dataflow job {} threw exception. Failure message was: {}", (Object)this.job.getJobId(), (Object)message.getMessageText());
                this.errorMessage.append(message.getMessageText());
                this.hasSeenError = true;
            }
        }

        boolean hasSeenError() {
            return this.hasSeenError;
        }

        String getErrorMessage() {
            return this.errorMessage.toString();
        }
    }
}

