/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nonnull;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.SparkUnboundedSource;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
import org.apache.beam.runners.spark.translation.BoundedDataset;
import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
import org.apache.beam.runners.spark.translation.SparkAssignWindowFn;
import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
import org.apache.beam.runners.spark.translation.SparkPCollectionView;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.spark.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.spark.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.spark.repackaged.com.google.common.collect.Maps;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Instant;

final class StreamingTransformTranslator {
    private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps.newHashMap();

    private StreamingTransformTranslator() {
    }

    private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
        return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>(){

            @Override
            public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
                JavaDStream dstream = ((UnboundedDataset)context.borrowDataset(transform)).getDStream();
                dstream.map(WindowingHelpers.unwindowFunction()).print(transform.getNum());
            }
        };
    }

    private static <T> TransformEvaluator<Read.Unbounded<T>> readUnbounded() {
        return new TransformEvaluator<Read.Unbounded<T>>(){

            @Override
            public void evaluate(Read.Unbounded<T> transform, EvaluationContext context) {
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)SparkUnboundedSource.read(context.getStreamingContext(), context.getRuntimeContext(), transform.getSource()));
            }
        };
    }

    private static <T> TransformEvaluator<CreateStream<T>> createFromQueue() {
        return new TransformEvaluator<CreateStream<T>>(){

            @Override
            public void evaluate(CreateStream<T> transform, EvaluationContext context) {
                Coder coder = ((PCollection)context.getOutput(transform)).getCoder();
                JavaStreamingContext jssc = context.getStreamingContext();
                Queue values = transform.getBatches();
                WindowedValue.FullWindowedValueCoder windowCoder = WindowedValue.FullWindowedValueCoder.of((Coder)coder, (Coder)GlobalWindow.Coder.INSTANCE);
                LinkedBlockingQueue<JavaRDD> rddQueue = new LinkedBlockingQueue<JavaRDD>();
                for (Iterable iterable : values) {
                    Iterable windowedValues = Iterables.transform(iterable, new org.apache.beam.spark.repackaged.com.google.common.base.Function<TimestampedValue<T>, WindowedValue<T>>(){

                        @Override
                        public WindowedValue<T> apply(@Nonnull TimestampedValue<T> timestampedValue) {
                            return WindowedValue.of((Object)timestampedValue.getValue(), (Instant)timestampedValue.getTimestamp(), (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING);
                        }
                    });
                    JavaRDD rdd = jssc.sparkContext().parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(CoderHelpers.fromByteFunction(windowCoder));
                    rddQueue.offer(rdd);
                }
                JavaInputDStream inputDStream = jssc.queueStream(rddQueue, true);
                UnboundedDataset unboundedDataset = new UnboundedDataset(inputDStream, Collections.singletonList(inputDStream.inputDStream().id()));
                Queue<GlobalWatermarkHolder.SparkWatermarks> times = transform.getTimes();
                GlobalWatermarkHolder.addAll(ImmutableMap.of(unboundedDataset.getStreamSources().get(0), times));
                context.putDataset(transform, unboundedDataset);
            }
        };
    }

    private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.PCollections<T>>(){

            @Override
            public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
                List<TaggedPValue> pcs = context.getInputs((PTransform<?, ?>)transform);
                ArrayList<Object> dStreams = new ArrayList<Object>();
                ArrayList<Integer> streamingSources = new ArrayList<Integer>();
                for (TaggedPValue pv : pcs) {
                    Preconditions.checkArgument(pv.getValue() instanceof PCollection, "Flatten had non-PCollection value in input: %s of type %s", (Object)pv.getValue(), (Object)pv.getValue().getClass().getSimpleName());
                    PCollection pcol = (PCollection)pv.getValue();
                    Dataset dataset = context.borrowDataset((PValue)pcol);
                    if (dataset instanceof UnboundedDataset) {
                        UnboundedDataset unboundedDataset = (UnboundedDataset)dataset;
                        streamingSources.addAll(unboundedDataset.getStreamSources());
                        dStreams.add(unboundedDataset.getDStream());
                        continue;
                    }
                    LinkedBlockingQueue q = new LinkedBlockingQueue();
                    q.offer(((BoundedDataset)dataset).getRDD());
                    JavaDStream dStream = context.getStreamingContext().queueStream(q);
                    dStreams.add(dStream);
                }
                JavaDStream unifiedStreams = context.getStreamingContext().union((JavaDStream)dStreams.remove(0), dStreams);
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(unifiedStreams, streamingSources));
            }
        };
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>> window() {
        return new TransformEvaluator<Window.Assign<T>>(){

            @Override
            public void evaluate(final Window.Assign<T> transform, EvaluationContext context) {
                UnboundedDataset unboundedDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                JavaDStream dStream = unboundedDataset.getDStream();
                JavaDStream outputStream = TranslationUtils.skipAssignWindows(transform, context) ? dStream : dStream.transform(new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>(){

                    public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
                        return rdd.map(new SparkAssignWindowFn(transform.getWindowFn()));
                    }
                });
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(outputStream, unboundedDataset.getStreamSources()));
            }
        };
    }

    private static <K, V, W extends BoundedWindow> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
        return new TransformEvaluator<GroupByKey<K, V>>(){

            @Override
            public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
                UnboundedDataset inputDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                List<Integer> streamSources = inputDataset.getStreamSources();
                JavaDStream dStream = inputDataset.getDStream();
                final KvCoder coder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                WindowingStrategy windowingStrategy = ((PCollection)context.getInput(transform)).getWindowingStrategy();
                WindowFn windowFn = windowingStrategy.getWindowFn();
                WindowedValue.FullWindowedValueCoder wvCoder = WindowedValue.FullWindowedValueCoder.of((Coder)coder.getValueCoder(), (Coder)windowFn.windowCoder());
                JavaDStream groupedByKeyStream = dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>, JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>>((WindowedValue.WindowedValueCoder)wvCoder){
                    final /* synthetic */ WindowedValue.WindowedValueCoder val$wvCoder;
                    {
                        this.val$wvCoder = windowedValueCoder;
                    }

                    public JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> call(JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception {
                        return GroupCombineFunctions.groupByKeyOnly(rdd, coder.getKeyCoder(), this.val$wvCoder);
                    }
                });
                JavaDStream outStream = SparkGroupAlsoByWindowViaWindowSet.groupAlsoByWindow(groupedByKeyStream, coder.getKeyCoder(), wvCoder, windowingStrategy, runtimeContext, streamSources);
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(outStream, streamSources));
            }
        };
    }

    private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> combineGrouped() {
        return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>(){

            @Override
            public void evaluate(final Combine.GroupedValues<K, InputT, OutputT> transform, EvaluationContext context) {
                PCollection input = (PCollection)context.getInput(transform);
                final WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                final CombineWithContext.KeyedCombineFnWithContext fn = CombineFnUtil.toFnWithContext((CombineFnBase.PerKeyCombineFn)transform.getFn());
                UnboundedDataset unboundedDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                JavaDStream dStream = unboundedDataset.getDStream();
                final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                final SparkPCollectionView pviews = context.getPViews();
                JavaDStream outStream = dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>>, JavaRDD<WindowedValue<KV<K, OutputT>>>>(){

                    public JavaRDD<WindowedValue<KV<K, OutputT>>> call(JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> rdd) throws Exception {
                        SparkKeyedCombineFn combineFnWithContext = new SparkKeyedCombineFn(fn, runtimeContext, TranslationUtils.getSideInputs(transform.getSideInputs(), new JavaSparkContext(rdd.context()), pviews), windowingStrategy);
                        return rdd.map(new TranslationUtils.CombineGroupedValues(combineFnWithContext));
                    }
                });
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(outStream, unboundedDataset.getStreamSources()));
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
        return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>(){

            @Override
            public void evaluate(final ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
                final DoFn doFn = transform.getFn();
                TranslationUtils.rejectSplittable(doFn);
                TranslationUtils.rejectStateAndTimers(doFn);
                final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                final WindowingStrategy windowingStrategy = ((PCollection)context.getInput(transform)).getWindowingStrategy();
                final SparkPCollectionView pviews = context.getPViews();
                UnboundedDataset unboundedDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                JavaDStream dStream = unboundedDataset.getDStream();
                final String stepName = context.getCurrentTransform().getFullName();
                JavaDStream outStream = dStream.transform(new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>(){

                    public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
                        JavaSparkContext jsc = new JavaSparkContext(rdd.context());
                        Accumulator<NamedAggregators> aggAccum = SparkAggregators.getNamedAggregators(jsc);
                        Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance();
                        Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), jsc, pviews);
                        return rdd.mapPartitions(new DoFnFunction(aggAccum, metricsAccum, stepName, doFn, runtimeContext, sideInputs, windowingStrategy));
                    }
                });
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(outStream, unboundedDataset.getStreamSources()));
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> multiDo() {
        return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>(){

            @Override
            public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform, final EvaluationContext context) {
                final DoFn doFn = transform.getFn();
                TranslationUtils.rejectSplittable(doFn);
                TranslationUtils.rejectStateAndTimers(doFn);
                final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                final SparkPCollectionView pviews = context.getPViews();
                final WindowingStrategy windowingStrategy = ((PCollection)context.getInput(transform)).getWindowingStrategy();
                UnboundedDataset unboundedDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                JavaDStream dStream = unboundedDataset.getDStream();
                JavaPairDStream all = dStream.transformToPair(new Function<JavaRDD<WindowedValue<InputT>>, JavaPairRDD<TupleTag<?>, WindowedValue<?>>>(){

                    public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
                        String stepName = context.getCurrentTransform().getFullName();
                        JavaSparkContext jsc = new JavaSparkContext(rdd.context());
                        Accumulator<NamedAggregators> aggAccum = SparkAggregators.getNamedAggregators(jsc);
                        Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance();
                        Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), JavaSparkContext.fromSparkContext((SparkContext)rdd.context()), pviews);
                        return rdd.mapPartitionsToPair(new MultiDoFnFunction(aggAccum, metricsAccum, stepName, doFn, runtimeContext, transform.getMainOutputTag(), sideInputs, windowingStrategy));
                    }
                }).cache();
                List<TaggedPValue> pct = context.getOutputs((PTransform<?, ?>)transform);
                for (TaggedPValue e : pct) {
                    JavaPairDStream filtered = all.filter(new TranslationUtils.TupleTagFilter(e.getTag()));
                    JavaDStream values = TranslationUtils.dStreamValues(filtered);
                    context.putDataset(e.getValue(), new UnboundedDataset(values, unboundedDataset.getStreamSources()));
                }
            }
        };
    }

    private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K, V>> reshuffle() {
        return new TransformEvaluator<Reshuffle<K, V>>(){

            @Override
            public void evaluate(Reshuffle<K, V> transform, EvaluationContext context) {
                UnboundedDataset inputDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                List<Integer> streamSources = inputDataset.getStreamSources();
                JavaDStream dStream = inputDataset.getDStream();
                final KvCoder coder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                WindowingStrategy windowingStrategy = ((PCollection)context.getInput(transform)).getWindowingStrategy();
                WindowFn windowFn = windowingStrategy.getWindowFn();
                WindowedValue.FullWindowedValueCoder wvCoder = WindowedValue.FullWindowedValueCoder.of((Coder)coder.getValueCoder(), (Coder)windowFn.windowCoder());
                JavaDStream reshuffledStream = dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>, JavaRDD<WindowedValue<KV<K, V>>>>((WindowedValue.WindowedValueCoder)wvCoder){
                    final /* synthetic */ WindowedValue.WindowedValueCoder val$wvCoder;
                    {
                        this.val$wvCoder = windowedValueCoder;
                    }

                    public JavaRDD<WindowedValue<KV<K, V>>> call(JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception {
                        return GroupCombineFunctions.reshuffle(rdd, coder.getKeyCoder(), this.val$wvCoder);
                    }
                });
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(reshuffledStream, streamSources));
            }
        };
    }

    static {
        EVALUATORS.put(Read.Unbounded.class, StreamingTransformTranslator.readUnbounded());
        EVALUATORS.put(GroupByKey.class, StreamingTransformTranslator.groupByKey());
        EVALUATORS.put(Combine.GroupedValues.class, StreamingTransformTranslator.combineGrouped());
        EVALUATORS.put(ParDo.Bound.class, StreamingTransformTranslator.parDo());
        EVALUATORS.put(ParDo.BoundMulti.class, StreamingTransformTranslator.multiDo());
        EVALUATORS.put(ConsoleIO.Write.Unbound.class, StreamingTransformTranslator.print());
        EVALUATORS.put(CreateStream.class, StreamingTransformTranslator.createFromQueue());
        EVALUATORS.put(Window.Assign.class, StreamingTransformTranslator.window());
        EVALUATORS.put(Flatten.PCollections.class, StreamingTransformTranslator.flattenPColl());
        EVALUATORS.put(Reshuffle.class, StreamingTransformTranslator.reshuffle());
    }

    public static class Translator
    implements SparkPipelineTranslator {
        private final SparkPipelineTranslator batchTranslator;

        Translator(SparkPipelineTranslator batchTranslator) {
            this.batchTranslator = batchTranslator;
        }

        @Override
        public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
            return EVALUATORS.containsKey(clazz);
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(Class<TransformT> clazz) {
            TransformEvaluator<TransformT> transformEvaluator = this.batchTranslator.translateBounded(clazz);
            Preconditions.checkState(transformEvaluator != null, "No TransformEvaluator registered for BOUNDED transform %s", clazz);
            return transformEvaluator;
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(Class<TransformT> clazz) {
            TransformEvaluator transformEvaluator = (TransformEvaluator)EVALUATORS.get(clazz);
            Preconditions.checkState(transformEvaluator != null, "No TransformEvaluator registered for UNBOUNDED transform %s", clazz);
            return transformEvaluator;
        }
    }
}

