/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.metrics.SparkMetricResults;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Duration;

public abstract class SparkPipelineResult
implements PipelineResult {
    protected final Future pipelineExecution;
    protected JavaSparkContext javaSparkContext;
    protected PipelineResult.State state;
    private final SparkMetricResults metricResults = new SparkMetricResults();

    SparkPipelineResult(Future<?> pipelineExecution, JavaSparkContext javaSparkContext) {
        this.pipelineExecution = pipelineExecution;
        this.javaSparkContext = javaSparkContext;
        this.state = PipelineResult.State.RUNNING;
    }

    private RuntimeException runtimeExceptionFrom(Throwable e) {
        return e instanceof RuntimeException ? (RuntimeException)e : new RuntimeException(e);
    }

    private RuntimeException beamExceptionFrom(Throwable e) {
        if (e instanceof SparkException) {
            if (e.getCause() != null && e.getCause() instanceof UserCodeException) {
                UserCodeException userException = (UserCodeException)e.getCause();
                return new Pipeline.PipelineExecutionException(userException.getCause());
            }
            if (e.getCause() != null) {
                return new Pipeline.PipelineExecutionException(e.getCause());
            }
        }
        return this.runtimeExceptionFrom(e);
    }

    protected abstract void stop();

    protected abstract PipelineResult.State awaitTermination(Duration var1) throws TimeoutException, ExecutionException, InterruptedException;

    public <T> T getAggregatorValue(String name, Class<T> resultType) {
        return SparkAggregators.valueOf(name, resultType, this.javaSparkContext);
    }

    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
        return SparkAggregators.valueOf(aggregator, this.javaSparkContext);
    }

    public PipelineResult.State getState() {
        return this.state;
    }

    public PipelineResult.State waitUntilFinish() {
        return this.waitUntilFinish(Duration.millis((long)Long.MAX_VALUE));
    }

    public PipelineResult.State waitUntilFinish(Duration duration) {
        try {
            this.state = this.awaitTermination(duration);
        }
        catch (TimeoutException e) {
            this.state = null;
        }
        catch (ExecutionException e) {
            this.state = PipelineResult.State.FAILED;
            throw this.beamExceptionFrom(e.getCause());
        }
        catch (Exception e) {
            this.state = PipelineResult.State.FAILED;
            throw this.beamExceptionFrom(e);
        }
        finally {
            this.stop();
        }
        return this.state;
    }

    public MetricResults metrics() {
        return this.metricResults;
    }

    public PipelineResult.State cancel() throws IOException {
        if (this.state != null && !this.state.isTerminal()) {
            this.stop();
            this.state = PipelineResult.State.CANCELLED;
        }
        return this.state;
    }

    static class StreamingMode
    extends SparkPipelineResult {
        private final JavaStreamingContext javaStreamingContext;

        StreamingMode(Future<?> pipelineExecution, JavaStreamingContext javaStreamingContext) {
            super(pipelineExecution, javaStreamingContext.sparkContext());
            this.javaStreamingContext = javaStreamingContext;
        }

        @Override
        protected void stop() {
            this.javaStreamingContext.stop(false, true);
            this.javaStreamingContext.awaitTermination(0L);
            SparkContextFactory.stopSparkContext(this.javaSparkContext);
        }

        @Override
        protected PipelineResult.State awaitTermination(Duration duration) throws TimeoutException, ExecutionException, InterruptedException {
            this.pipelineExecution.get(duration.getMillis(), TimeUnit.MILLISECONDS);
            if (this.javaStreamingContext.awaitTerminationOrTimeout(duration.getMillis())) {
                return PipelineResult.State.DONE;
            }
            return null;
        }
    }

    static class BatchMode
    extends SparkPipelineResult {
        BatchMode(Future<?> pipelineExecution, JavaSparkContext javaSparkContext) {
            super(pipelineExecution, javaSparkContext);
        }

        @Override
        protected void stop() {
            SparkContextFactory.stopSparkContext(this.javaSparkContext);
        }

        @Override
        protected PipelineResult.State awaitTermination(Duration duration) throws TimeoutException, ExecutionException, InterruptedException {
            this.pipelineExecution.get(duration.getMillis(), TimeUnit.MILLISECONDS);
            return PipelineResult.State.DONE;
        }
    }
}

