/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobInvocation {
    private static final Logger LOG = LoggerFactory.getLogger(JobInvocation.class);
    private final RunnerApi.Pipeline pipeline;
    private final PortablePipelineRunner pipelineRunner;
    private final JobInfo jobInfo;
    private final ListeningExecutorService executorService;
    private List<Consumer<JobApi.JobState.Enum>> stateObservers;
    private List<Consumer<JobApi.JobMessage>> messageObservers;
    private JobApi.JobState.Enum jobState;
    @Nullable
    private ListenableFuture<PipelineResult> invocationFuture;

    public JobInvocation(JobInfo jobInfo, ListeningExecutorService executorService, RunnerApi.Pipeline pipeline, PortablePipelineRunner pipelineRunner) {
        this.jobInfo = jobInfo;
        this.executorService = executorService;
        this.pipeline = pipeline;
        this.pipelineRunner = pipelineRunner;
        this.stateObservers = new ArrayList<Consumer<JobApi.JobState.Enum>>();
        this.messageObservers = new ArrayList<Consumer<JobApi.JobMessage>>();
        this.invocationFuture = null;
        this.jobState = JobApi.JobState.Enum.STOPPED;
    }

    private PipelineResult runPipeline() throws Exception {
        return this.pipelineRunner.run(this.pipeline, this.jobInfo);
    }

    public synchronized void start() {
        LOG.info("Starting job invocation {}", (Object)this.getId());
        if (this.getState() != JobApi.JobState.Enum.STOPPED) {
            throw new IllegalStateException(String.format("Job %s already running.", this.getId()));
        }
        this.setState(JobApi.JobState.Enum.STARTING);
        this.invocationFuture = this.executorService.submit(this::runPipeline);
        this.setState(JobApi.JobState.Enum.RUNNING);
        Futures.addCallback(this.invocationFuture, (FutureCallback)new FutureCallback<PipelineResult>(){

            public void onSuccess(@Nullable PipelineResult pipelineResult) {
                if (pipelineResult != null) {
                    Preconditions.checkArgument((pipelineResult.getState() == PipelineResult.State.DONE ? 1 : 0) != 0, (Object)("Success on non-Done state: " + pipelineResult.getState()));
                    JobInvocation.this.setState(JobApi.JobState.Enum.DONE);
                } else {
                    JobInvocation.this.setState(JobApi.JobState.Enum.UNSPECIFIED);
                }
            }

            public void onFailure(Throwable throwable) {
                String message = String.format("Error during job invocation %s.", JobInvocation.this.getId());
                LOG.error(message, throwable);
                JobInvocation.this.sendMessage(JobApi.JobMessage.newBuilder().setMessageText(Throwables.getStackTraceAsString((Throwable)throwable)).setImportance(JobApi.JobMessage.MessageImportance.JOB_MESSAGE_DEBUG).build());
                JobInvocation.this.sendMessage(JobApi.JobMessage.newBuilder().setMessageText(Throwables.getRootCause((Throwable)throwable).toString()).setImportance(JobApi.JobMessage.MessageImportance.JOB_MESSAGE_ERROR).build());
                JobInvocation.this.setState(JobApi.JobState.Enum.FAILED);
            }
        }, (Executor)this.executorService);
    }

    public String getId() {
        return this.jobInfo.jobId();
    }

    public synchronized void cancel() {
        LOG.info("Canceling job invocation {}", (Object)this.getId());
        if (this.invocationFuture != null) {
            this.invocationFuture.cancel(true);
            Futures.addCallback(this.invocationFuture, (FutureCallback)new FutureCallback<PipelineResult>(){

                public void onSuccess(@Nullable PipelineResult pipelineResult) {
                    if (pipelineResult != null) {
                        try {
                            pipelineResult.cancel();
                        }
                        catch (IOException exn) {
                            throw new RuntimeException(exn);
                        }
                    }
                }

                public void onFailure(Throwable throwable) {
                }
            }, (Executor)this.executorService);
        }
    }

    public JobApi.JobState.Enum getState() {
        return this.jobState;
    }

    public synchronized void addStateListener(Consumer<JobApi.JobState.Enum> stateStreamObserver) {
        stateStreamObserver.accept(this.getState());
        this.stateObservers.add(stateStreamObserver);
    }

    public synchronized void addMessageListener(Consumer<JobApi.JobMessage> messageStreamObserver) {
        this.messageObservers.add(messageStreamObserver);
    }

    private synchronized void setState(JobApi.JobState.Enum state) {
        this.jobState = state;
        for (Consumer<JobApi.JobState.Enum> observer : this.stateObservers) {
            observer.accept(state);
        }
    }

    private synchronized void sendMessage(JobApi.JobMessage message) {
        for (Consumer<JobApi.JobMessage> observer : this.messageObservers) {
            observer.accept(message);
        }
    }

    static Boolean isTerminated(JobApi.JobState.Enum state) {
        switch (state) {
            case DONE: 
            case FAILED: 
            case CANCELLED: 
            case DRAINED: {
                return true;
            }
        }
        return false;
    }
}

