/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
import org.apache.beam.runners.spark.metrics.CompositeSource;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.SparkBeamMetricSource;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.metrics.source.Source;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SparkRunner
extends PipelineRunner<SparkPipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
    private final SparkPipelineOptions mOptions;

    public static SparkRunner create() {
        SparkPipelineOptions options = (SparkPipelineOptions)PipelineOptionsFactory.as(SparkPipelineOptions.class);
        options.setRunner(SparkRunner.class);
        return new SparkRunner(options);
    }

    public static SparkRunner create(SparkPipelineOptions options) {
        return new SparkRunner(options);
    }

    public static SparkRunner fromOptions(PipelineOptions options) {
        SparkPipelineOptions sparkOptions = (SparkPipelineOptions)PipelineOptionsValidator.validate(SparkPipelineOptions.class, (PipelineOptions)options);
        if (sparkOptions.getFilesToStage() == null) {
            sparkOptions.setFilesToStage(PipelineResources.detectClassPathResourcesToStage((ClassLoader)SparkRunner.class.getClassLoader()));
            LOG.info("PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage {} files. Enable logging at DEBUG level to see which files will be staged.", (Object)sparkOptions.getFilesToStage().size());
            LOG.debug("Classpath elements: {}", sparkOptions.getFilesToStage());
        }
        return new SparkRunner(sparkOptions);
    }

    private SparkRunner(SparkPipelineOptions options) {
        this.mOptions = options;
    }

    public SparkPipelineResult run(Pipeline pipeline) {
        SparkPipelineResult result;
        LOG.info("Executing pipeline using the SparkRunner.");
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        MetricsEnvironment.setMetricsSupported((boolean)true);
        this.detectTranslationMode(pipeline);
        if (this.mOptions.isStreaming()) {
            Checkpoint.CheckpointDir checkpointDir = new Checkpoint.CheckpointDir(this.mOptions.getCheckpointDir());
            SparkRunnerStreamingContextFactory streamingContextFactory = new SparkRunnerStreamingContextFactory(pipeline, this.mOptions, checkpointDir);
            JavaStreamingContext jssc = JavaStreamingContext.getOrCreate((String)checkpointDir.getSparkCheckpointDir().toString(), (Function0)streamingContextFactory);
            jssc.addStreamingListener((StreamingListener)new JavaStreamingListenerWrapper((JavaStreamingListener)new AggregatorsAccumulator.AccumulatorCheckpointingSparkListener()));
            jssc.addStreamingListener((StreamingListener)new JavaStreamingListenerWrapper((JavaStreamingListener)new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
            for (JavaStreamingListener listener : ((SparkContextOptions)this.mOptions.as(SparkContextOptions.class)).getListeners()) {
                LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
                jssc.addStreamingListener((StreamingListener)new JavaStreamingListenerWrapper(listener));
            }
            jssc.addStreamingListener((StreamingListener)new JavaStreamingListenerWrapper((JavaStreamingListener)new GlobalWatermarkHolder.WatermarkAdvancingStreamingListener()));
            SparkRunner.initAccumulators(this.mOptions, jssc.sparkContext());
            Future<?> startPipeline = executorService.submit(() -> {
                LOG.info("Starting streaming pipeline execution.");
                jssc.start();
            });
            executorService.shutdown();
            result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
        } else {
            JavaSparkContext jsc = SparkContextFactory.getSparkContext(this.mOptions);
            EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline, this.mOptions);
            TransformTranslator.Translator translator = new TransformTranslator.Translator();
            SparkRunner.updateCacheCandidates(pipeline, translator, evaluationContext);
            SparkRunner.initAccumulators(this.mOptions, jsc);
            Future<?> startPipeline = executorService.submit(() -> {
                pipeline.traverseTopologically((Pipeline.PipelineVisitor)new Evaluator(translator, evaluationContext));
                evaluationContext.computeOutputs();
                LOG.info("Batch pipeline execution complete.");
            });
            executorService.shutdown();
            result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
        }
        if (this.mOptions.getEnableSparkMetricSinks().booleanValue()) {
            this.registerMetricsSource(this.mOptions.getAppName());
        }
        return result;
    }

    private void registerMetricsSource(String appName) {
        MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
        AggregatorMetricSource aggregatorMetricSource = new AggregatorMetricSource(null, (NamedAggregators)AggregatorsAccumulator.getInstance().value());
        SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null);
        CompositeSource compositeSource = new CompositeSource(appName + ".Beam", metricsSource.metricRegistry(), aggregatorMetricSource.metricRegistry());
        metricsSystem.removeSource((Source)compositeSource);
        metricsSystem.registerSource((Source)compositeSource);
    }

    public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext jsc) {
        MetricsAccumulator.init(opts, jsc);
        AggregatorsAccumulator.init(opts, jsc);
    }

    private void detectTranslationMode(Pipeline pipeline) {
        TranslationModeDetector detector = new TranslationModeDetector();
        pipeline.traverseTopologically((Pipeline.PipelineVisitor)detector);
        if (detector.getTranslationMode().equals((Object)TranslationMode.STREAMING)) {
            this.mOptions.setStreaming(true);
        }
    }

    public static void updateCacheCandidates(Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) {
        CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext);
        pipeline.traverseTopologically((Pipeline.PipelineVisitor)cacheVisitor);
    }

    public static class Evaluator
    extends Pipeline.PipelineVisitor.Defaults {
        private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
        protected final EvaluationContext ctxt;
        protected final SparkPipelineTranslator translator;

        public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) {
            this.translator = translator;
            this.ctxt = ctxt;
        }

        public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
            Class<?> transformClass;
            if (node.getTransform() != null && this.translator.hasTranslation(transformClass = node.getTransform().getClass()) && !this.shouldDefer(node)) {
                LOG.info("Entering directly-translatable composite transform: '{}'", (Object)node.getFullName());
                LOG.debug("Composite transform class: '{}'", transformClass);
                this.doVisitTransform(node);
                return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
            }
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }

        protected boolean shouldDefer(TransformHierarchy.Node node) {
            Collection nonAdditionalInputs = TransformInputs.nonAdditionalInputs((AppliedPTransform)node.toAppliedPTransform(this.getPipeline()));
            if (nonAdditionalInputs.size() != 1) {
                return false;
            }
            PValue input = (PValue)Iterables.getOnlyElement(nonAdditionalInputs);
            if (!(input instanceof PCollection) || ((PCollection)input).getWindowingStrategy().getWindowFn().isNonMerging()) {
                return false;
            }
            PTransform transform = node.getTransform();
            boolean hasSideInput = false;
            if (transform instanceof Combine.PerKey) {
                List sideInputs = ((Combine.PerKey)transform).getSideInputs();
                hasSideInput = sideInputs != null && !sideInputs.isEmpty();
            } else if (transform instanceof Combine.Globally) {
                List sideInputs = ((Combine.Globally)transform).getSideInputs();
                boolean bl = hasSideInput = sideInputs != null && !sideInputs.isEmpty();
            }
            if (hasSideInput) {
                LOG.info("Deferring combine transformation {} for job {}", (Object)transform, (Object)this.ctxt.getOptions().getJobName());
                return true;
            }
            return false;
        }

        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
            this.doVisitTransform(node);
        }

        <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformHierarchy.Node node) {
            PTransform transform = node.getTransform();
            Class<?> transformClass = transform.getClass();
            TransformEvaluator<PTransform> evaluator = this.translate(node, transform, transformClass);
            LOG.info("Evaluating {}", (Object)transform);
            AppliedPTransform appliedTransform = node.toAppliedPTransform(this.getPipeline());
            this.ctxt.setCurrentTransform(appliedTransform);
            evaluator.evaluate(transform, this.ctxt);
            this.ctxt.setCurrentTransform(null);
        }

        protected <TransformT extends PTransform<? super PInput, POutput>> TransformEvaluator<TransformT> translate(TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) {
            Map pValues = node.getInputs().isEmpty() ? node.getOutputs() : node.getInputs();
            PCollection.IsBounded isNodeBounded = this.isBoundedCollection(pValues.values());
            LOG.debug("Translating {} as {}", transform, (Object)isNodeBounded);
            return isNodeBounded.equals((Object)PCollection.IsBounded.BOUNDED) ? this.translator.translateBounded(transformClass) : this.translator.translateUnbounded(transformClass);
        }

        protected PCollection.IsBounded isBoundedCollection(Collection<PValue> pValues) {
            PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED;
            for (PValue pValue : pValues) {
                if (pValue instanceof PCollection) {
                    isBounded = isBounded.and(((PCollection)pValue).isBounded());
                    continue;
                }
                isBounded = isBounded.and(PCollection.IsBounded.BOUNDED);
            }
            return isBounded;
        }
    }

    static class CacheVisitor
    extends Evaluator {
        protected CacheVisitor(SparkPipelineTranslator translator, EvaluationContext evaluationContext) {
            super(translator, evaluationContext);
        }

        @Override
        public void doVisitTransform(TransformHierarchy.Node node) {
            for (PValue value : node.getInputs().values()) {
                if (!(value instanceof PCollection)) continue;
                long count = 1L;
                if (this.ctxt.getCacheCandidates().get(value) != null) {
                    count = this.ctxt.getCacheCandidates().get(value) + 1L;
                }
                this.ctxt.getCacheCandidates().put((PCollection)value, count);
            }
        }
    }

    private static class TranslationModeDetector
    extends Pipeline.PipelineVisitor.Defaults {
        private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class);
        private static final Collection<Class<? extends PTransform>> UNBOUNDED_INPUTS = Arrays.asList(Read.Unbounded.class, CreateStream.class);
        private TranslationMode translationMode;

        TranslationModeDetector(TranslationMode defaultMode) {
            this.translationMode = defaultMode;
        }

        TranslationModeDetector() {
            this(TranslationMode.BATCH);
        }

        TranslationMode getTranslationMode() {
            return this.translationMode;
        }

        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
            Class<?> transformClass;
            if (this.translationMode.equals((Object)TranslationMode.BATCH) && UNBOUNDED_INPUTS.contains(transformClass = node.getTransform().getClass())) {
                LOG.info("Found {}. Switching to streaming execution.", transformClass);
                this.translationMode = TranslationMode.STREAMING;
            }
        }
    }

    static enum TranslationMode {
        BATCH,
        STREAMING;

    }
}

