/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.DataflowMetrics;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.collect.BiMap;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.collect.HashBiMap;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataflowPipelineJob
implements PipelineResult {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
    protected String jobId;
    private final DataflowPipelineOptions dataflowOptions;
    private final DataflowClient dataflowClient;
    private final DataflowMetrics dataflowMetrics;
    @Nullable
    private PipelineResult.State terminalState = null;
    @Nullable
    private DataflowPipelineJob replacedByJob = null;
    protected BiMap<AppliedPTransform<?, ?, ?>, String> transformStepNames;
    private List<MetricUpdate> terminalMetricUpdates;
    private long lastTimestamp = Long.MIN_VALUE;
    static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds((long)2L);
    static final Duration STATUS_POLLING_INTERVAL = Duration.standardSeconds((long)2L);
    static final double DEFAULT_BACKOFF_EXPONENT = 1.5;
    static final int MESSAGES_POLLING_RETRIES = 11;
    static final int STATUS_POLLING_RETRIES = 4;
    private static final FluentBackoff MESSAGES_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(MESSAGES_POLLING_INTERVAL).withMaxRetries(11).withExponent(1.5);
    protected static final FluentBackoff STATUS_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(STATUS_POLLING_INTERVAL).withMaxRetries(4).withExponent(1.5);
    private AtomicReference<FutureTask<PipelineResult.State>> cancelState = new AtomicReference();

    public DataflowPipelineJob(DataflowClient dataflowClient, String jobId, DataflowPipelineOptions dataflowOptions, Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
        this.dataflowClient = dataflowClient;
        this.jobId = jobId;
        this.dataflowOptions = dataflowOptions;
        this.transformStepNames = HashBiMap.create(MoreObjects.firstNonNull(transformStepNames, ImmutableMap.of()));
        this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
    }

    public String getJobId() {
        return this.jobId;
    }

    public String getProjectId() {
        return this.dataflowOptions.getProject();
    }

    public DataflowPipelineJob getReplacedByJob() {
        if (this.terminalState == null) {
            throw new IllegalStateException("getReplacedByJob() called before job terminated");
        }
        if (this.replacedByJob == null) {
            throw new IllegalStateException("getReplacedByJob() called for job that was not replaced");
        }
        return this.replacedByJob;
    }

    @Nullable
    public PipelineResult.State waitUntilFinish() {
        return this.waitUntilFinish(Duration.millis((long)-1L));
    }

    @Nullable
    public PipelineResult.State waitUntilFinish(Duration duration) {
        try {
            return this.waitUntilFinish(duration, new MonitoringUtil.LoggingHandler());
        }
        catch (Exception e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    @VisibleForTesting
    public PipelineResult.State waitUntilFinish(Duration duration, MonitoringUtil.JobMessagesHandler messageHandler) throws IOException, InterruptedException {
        Thread shutdownHook = new Thread(){

            @Override
            public void run() {
                LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\nTo cancel the job in the cloud, run:\n> {}", (Object)MonitoringUtil.getGcloudCancelCommand(DataflowPipelineJob.this.dataflowOptions, DataflowPipelineJob.this.getJobId()));
            }
        };
        try {
            Runtime.getRuntime().addShutdownHook(shutdownHook);
            PipelineResult.State state = this.waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM, new MonitoringUtil(this.dataflowClient));
            return state;
        }
        finally {
            Runtime.getRuntime().removeShutdownHook(shutdownHook);
        }
    }

    @Nullable
    @VisibleForTesting
    PipelineResult.State waitUntilFinish(Duration duration, @Nullable MonitoringUtil.JobMessagesHandler messageHandler, Sleeper sleeper, NanoClock nanoClock) throws IOException, InterruptedException {
        return this.waitUntilFinish(duration, messageHandler, sleeper, nanoClock, new MonitoringUtil(this.dataflowClient));
    }

    @Nullable
    @VisibleForTesting
    PipelineResult.State waitUntilFinish(Duration duration, @Nullable MonitoringUtil.JobMessagesHandler messageHandler, Sleeper sleeper, NanoClock nanoClock, MonitoringUtil monitor) throws IOException, InterruptedException {
        PipelineResult.State state;
        BackOff backoff = !duration.isLongerThan((ReadableDuration)Duration.ZERO) ? BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)MESSAGES_BACKOFF_FACTORY.backoff()) : BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff());
        long startNanos = nanoClock.nanoTime();
        do {
            boolean hasError;
            boolean bl = hasError = (state = this.getStateWithRetries(BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()), sleeper)) == PipelineResult.State.UNKNOWN;
            if (messageHandler != null && !hasError) {
                try {
                    List<JobMessage> allMessages = monitor.getJobMessages(this.jobId, this.lastTimestamp);
                    if (!allMessages.isEmpty()) {
                        this.lastTimestamp = TimeUtil.fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
                        messageHandler.process(allMessages);
                    }
                }
                catch (GoogleJsonResponseException | SocketTimeoutException e) {
                    hasError = true;
                    LOG.warn("There were problems getting current job messages: {}.", (Object)e.getMessage());
                    LOG.debug("Exception information:", e);
                }
            }
            if (hasError) continue;
            if (state.isTerminal()) {
                switch (state) {
                    case DONE: 
                    case CANCELLED: {
                        LOG.info("Job {} finished with status {}.", (Object)this.getJobId(), (Object)state);
                        break;
                    }
                    case UPDATED: {
                        LOG.info("Job {} has been updated and is running as the new job with id {}. To access the updated job on the Dataflow monitoring console, please navigate to {}", new Object[]{this.getJobId(), this.getReplacedByJob().getJobId(), MonitoringUtil.getJobMonitoringPageURL(this.getReplacedByJob().getProjectId(), this.getReplacedByJob().getJobId())});
                        break;
                    }
                    default: {
                        LOG.info("Job {} failed with status {}.", (Object)this.getJobId(), (Object)state);
                    }
                }
                return state;
            }
            backoff.reset();
            if (!duration.isLongerThan((ReadableDuration)Duration.ZERO)) continue;
            long nanosConsumed = nanoClock.nanoTime() - startNanos;
            Duration consumed = Duration.millis((long)((nanosConsumed + 999999L) / 1000000L));
            Duration remaining = duration.minus((ReadableDuration)consumed);
            backoff = remaining.isLongerThan((ReadableDuration)Duration.ZERO) ? BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff()) : BackOff.STOP_BACKOFF;
        } while (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff));
        LOG.warn("No terminal state was returned. State value {}", (Object)state);
        return null;
    }

    public PipelineResult.State cancel() throws IOException {
        FutureTask<PipelineResult.State> tentativeCancelTask = new FutureTask<PipelineResult.State>(new Callable<PipelineResult.State>(){

            @Override
            public PipelineResult.State call() throws Exception {
                Job content = new Job();
                content.setProjectId(DataflowPipelineJob.this.getProjectId());
                content.setId(DataflowPipelineJob.this.jobId);
                content.setRequestedState("JOB_STATE_CANCELLED");
                try {
                    Job job = DataflowPipelineJob.this.dataflowClient.updateJob(DataflowPipelineJob.this.jobId, content);
                    return MonitoringUtil.toState(job.getCurrentState());
                }
                catch (IOException e) {
                    PipelineResult.State state = DataflowPipelineJob.this.getState();
                    if (state.isTerminal()) {
                        LOG.warn("Cancel failed because job is already terminated. State is {}", (Object)state);
                        return state;
                    }
                    if (e.getMessage().contains("has terminated")) {
                        LOG.warn("Cancel failed because job is already terminated.", (Throwable)e);
                        return state;
                    }
                    String errorMsg = String.format("Failed to cancel job in state %s, please go to the Developers Console to cancel it manually: %s", state, MonitoringUtil.getJobMonitoringPageURL(DataflowPipelineJob.this.getProjectId(), DataflowPipelineJob.this.getJobId()));
                    LOG.warn(errorMsg);
                    throw new IOException(errorMsg, e);
                }
            }
        });
        if (this.cancelState.compareAndSet(null, tentativeCancelTask)) {
            this.cancelState.get().run();
        }
        try {
            return this.cancelState.get().get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new IOException(e);
        }
    }

    public PipelineResult.State getState() {
        if (this.terminalState != null) {
            return this.terminalState;
        }
        return this.getStateWithRetries(BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)STATUS_BACKOFF_FACTORY.backoff()), Sleeper.DEFAULT);
    }

    @VisibleForTesting
    PipelineResult.State getStateWithRetries(BackOff attempts, Sleeper sleeper) {
        if (this.terminalState != null) {
            return this.terminalState;
        }
        try {
            Job job = this.getJobWithRetries(attempts, sleeper);
            return MonitoringUtil.toState(job.getCurrentState());
        }
        catch (IOException exn) {
            return PipelineResult.State.UNKNOWN;
        }
    }

    private Job getJobWithRetries(BackOff backoff, Sleeper sleeper) throws IOException {
        while (true) {
            try {
                Job job = this.dataflowClient.getJob(this.jobId);
                PipelineResult.State currentState = MonitoringUtil.toState(job.getCurrentState());
                if (currentState.isTerminal()) {
                    this.terminalState = currentState;
                    this.replacedByJob = new DataflowPipelineJob(this.dataflowClient, job.getReplacedByJobId(), this.dataflowOptions, this.transformStepNames);
                }
                return job;
            }
            catch (IOException exn) {
                LOG.warn("There were problems getting current job status: {}.", (Object)exn.getMessage());
                LOG.debug("Exception information:", (Throwable)exn);
                if (this.nextBackOff(sleeper, backoff)) continue;
                throw exn;
            }
            break;
        }
    }

    private boolean nextBackOff(Sleeper sleeper, BackOff backoff) {
        try {
            return BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff);
        }
        catch (IOException | InterruptedException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(e);
        }
    }

    public MetricResults metrics() {
        return this.dataflowMetrics;
    }
}

