/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineResult;
import org.apache.beam.runners.spark.structuredstreaming.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetricSource;
import org.apache.beam.runners.spark.structuredstreaming.metrics.CompositeSource;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.metrics.SparkBeamMetricSource;
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.metrics.source.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SparkStructuredStreamingRunner
extends PipelineRunner<SparkStructuredStreamingPipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkStructuredStreamingRunner.class);
    private final SparkStructuredStreamingPipelineOptions options;

    public static SparkStructuredStreamingRunner create() {
        SparkStructuredStreamingPipelineOptions options = (SparkStructuredStreamingPipelineOptions)PipelineOptionsFactory.as(SparkStructuredStreamingPipelineOptions.class);
        options.setRunner(SparkStructuredStreamingRunner.class);
        return new SparkStructuredStreamingRunner(options);
    }

    public static SparkStructuredStreamingRunner create(SparkStructuredStreamingPipelineOptions options) {
        return new SparkStructuredStreamingRunner(options);
    }

    public static SparkStructuredStreamingRunner fromOptions(PipelineOptions options) {
        SparkStructuredStreamingPipelineOptions sparkOptions = (SparkStructuredStreamingPipelineOptions)PipelineOptionsValidator.validate(SparkStructuredStreamingPipelineOptions.class, (PipelineOptions)options);
        if (sparkOptions.getFilesToStage() == null && !PipelineTranslator.isLocalSparkMaster(sparkOptions)) {
            sparkOptions.setFilesToStage(PipelineResources.detectClassPathResourcesToStage((ClassLoader)SparkStructuredStreamingRunner.class.getClassLoader(), (PipelineOptions)options));
            LOG.info("PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage {} files. Enable logging at DEBUG level to see which files will be staged.", (Object)sparkOptions.getFilesToStage().size());
            LOG.debug("Classpath elements: {}", sparkOptions.getFilesToStage());
        }
        return new SparkStructuredStreamingRunner(sparkOptions);
    }

    private SparkStructuredStreamingRunner(SparkStructuredStreamingPipelineOptions options) {
        this.options = options;
    }

    public SparkStructuredStreamingPipelineResult run(Pipeline pipeline) {
        MetricsEnvironment.setMetricsSupported((boolean)true);
        LOG.info("*** SparkStructuredStreamingRunner is based on spark structured streaming framework and is no more \n based on RDD/DStream API. See\n https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html\n It is still experimental, its coverage of the Beam model is partial. ***");
        AggregatorsAccumulator.clear();
        MetricsAccumulator.clear();
        TranslationContext translationContext = this.translatePipeline(pipeline);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<?> submissionFuture = executorService.submit(() -> translationContext.startPipeline());
        executorService.shutdown();
        SparkStructuredStreamingPipelineResult result = new SparkStructuredStreamingPipelineResult(submissionFuture, translationContext.getSparkSession());
        if (this.options.getEnableSparkMetricSinks().booleanValue()) {
            this.registerMetricsSource(this.options.getAppName());
        }
        MetricsPusher metricsPusher = new MetricsPusher(MetricsAccumulator.getInstance().value(), (MetricsOptions)this.options.as(MetricsOptions.class), (PipelineResult)result);
        metricsPusher.start();
        if (this.options.getTestMode()) {
            result.waitUntilFinish();
            result.stop();
        }
        return result;
    }

    private TranslationContext translatePipeline(Pipeline pipeline) {
        PipelineTranslator.detectTranslationMode(pipeline, this.options);
        PipelineTranslator.replaceTransforms(pipeline, this.options);
        PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(this.options);
        PipelineTranslator pipelineTranslator = this.options.isStreaming() ? new PipelineTranslatorStreaming(this.options) : new PipelineTranslatorBatch(this.options);
        JavaSparkContext jsc = JavaSparkContext.fromSparkContext((SparkContext)pipelineTranslator.getTranslationContext().getSparkSession().sparkContext());
        SparkStructuredStreamingRunner.initAccumulators(this.options, jsc);
        pipelineTranslator.translate(pipeline);
        return pipelineTranslator.getTranslationContext();
    }

    private void registerMetricsSource(String appName) {
        MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
        AggregatorMetricSource aggregatorMetricSource = new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value());
        SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null);
        CompositeSource compositeSource = new CompositeSource(appName + ".Beam", metricsSource.metricRegistry(), aggregatorMetricSource.metricRegistry());
        metricsSystem.removeSource((Source)compositeSource);
        metricsSystem.registerSource((Source)compositeSource);
    }

    public static void initAccumulators(SparkStructuredStreamingPipelineOptions opts, JavaSparkContext jsc) {
        MetricsAccumulator.init(jsc);
        AggregatorsAccumulator.init(jsc);
    }
}

