/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.SparkUnboundedSource;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
import org.apache.beam.runners.spark.translation.BoundedDataset;
import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
import org.apache.beam.runners.spark.translation.SparkAssignWindowFn;
import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
import org.apache.beam.runners.spark.translation.SparkPCollectionView;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
import org.apache.beam.runners.spark.translation.streaming.WatermarkSyncedDStream;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Instant;

public final class StreamingTransformTranslator {
    private static final Map<String, TransformEvaluator<?>> EVALUATORS = new HashMap();

    private StreamingTransformTranslator() {
    }

    private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
        return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>(){

            @Override
            public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
                JavaDStream dstream = ((UnboundedDataset)context.borrowDataset(transform)).getDStream();
                dstream.map(WindowedValue::getValue).print(transform.getNum());
            }

            @Override
            public String toNativeString() {
                return ".print(...)";
            }
        };
    }

    private static <T> TransformEvaluator<Read.Unbounded<T>> readUnbounded() {
        return new TransformEvaluator<Read.Unbounded<T>>(){

            @Override
            public void evaluate(Read.Unbounded<T> transform, EvaluationContext context) {
                String stepName = context.getCurrentTransform().getFullName();
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)SparkUnboundedSource.read(context.getStreamingContext(), context.getSerializableOptions(), transform.getSource(), stepName));
            }

            @Override
            public String toNativeString() {
                return "streamingContext.<readFrom(<source>)>()";
            }
        };
    }

    private static <T> TransformEvaluator<CreateStream<T>> createFromQueue() {
        return new TransformEvaluator<CreateStream<T>>(){

            @Override
            public void evaluate(CreateStream<T> transform, EvaluationContext context) {
                Queue rddQueue = this.buildRdds(transform.getBatches(), context.getStreamingContext(), ((PCollection)context.getOutput(transform)).getCoder());
                JavaInputDStream javaInputDStream = this.buildInputStream(rddQueue, transform, context);
                UnboundedDataset unboundedDataset = new UnboundedDataset(javaInputDStream, Collections.singletonList(javaInputDStream.inputDStream().id()));
                GlobalWatermarkHolder.addAll((Map<Integer, Queue<GlobalWatermarkHolder.SparkWatermarks>>)ImmutableMap.of((Object)unboundedDataset.getStreamSources().get(0), transform.getTimes()));
                context.putDataset(transform, unboundedDataset);
            }

            private Queue<JavaRDD<WindowedValue<T>>> buildRdds(Queue<Iterable<TimestampedValue<T>>> batches, JavaStreamingContext jssc, Coder<T> coder) {
                WindowedValue.FullWindowedValueCoder windowCoder = WindowedValue.FullWindowedValueCoder.of(coder, (Coder)GlobalWindow.Coder.INSTANCE);
                LinkedBlockingQueue rddQueue = new LinkedBlockingQueue();
                for (Iterable iterable : batches) {
                    Iterable windowedValues = StreamSupport.stream(iterable.spliterator(), false).map(timestampedValue -> WindowedValue.of((Object)timestampedValue.getValue(), (Instant)timestampedValue.getTimestamp(), (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING)).collect(Collectors.toList());
                    JavaRDD rdd = jssc.sparkContext().parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(CoderHelpers.fromByteFunction(windowCoder));
                    rddQueue.offer(rdd);
                }
                return rddQueue;
            }

            private JavaInputDStream<WindowedValue<T>> buildInputStream(Queue<JavaRDD<WindowedValue<T>>> rddQueue, CreateStream<T> transform, EvaluationContext context) {
                return transform.isForceWatermarkSync() ? new JavaInputDStream(new WatermarkSyncedDStream(rddQueue, transform.getBatchDuration(), context.getStreamingContext().ssc()), JavaSparkContext$.MODULE$.fakeClassTag()) : context.getStreamingContext().queueStream(rddQueue, true);
            }

            @Override
            public String toNativeString() {
                return "streamingContext.queueStream(...)";
            }
        };
    }

    private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.PCollections<T>>(){

            @Override
            public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
                Map<TupleTag<?>, PValue> pcs = context.getInputs((PTransform<?, ?>)transform);
                ArrayList<Object> dStreams = new ArrayList<Object>();
                ArrayList<Integer> streamingSources = new ArrayList<Integer>();
                for (PValue pv : pcs.values()) {
                    Preconditions.checkArgument((boolean)(pv instanceof PCollection), (String)"Flatten had non-PCollection value in input: %s of type %s", (Object)pv, (Object)pv.getClass().getSimpleName());
                    PCollection pcol = (PCollection)pv;
                    Dataset dataset = context.borrowDataset((PValue)pcol);
                    if (dataset instanceof UnboundedDataset) {
                        UnboundedDataset unboundedDataset = (UnboundedDataset)dataset;
                        streamingSources.addAll(unboundedDataset.getStreamSources());
                        dStreams.add(unboundedDataset.getDStream());
                        continue;
                    }
                    LinkedBlockingQueue q = new LinkedBlockingQueue();
                    q.offer(((BoundedDataset)dataset).getRDD());
                    JavaDStream dStream = context.getStreamingContext().queueStream(q);
                    dStreams.add(dStream);
                }
                JavaDStream unifiedStreams = context.getStreamingContext().union((JavaDStream)dStreams.remove(0), dStreams);
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(unifiedStreams, streamingSources));
            }

            @Override
            public String toNativeString() {
                return "streamingContext.union(...)";
            }
        };
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>> window() {
        return new TransformEvaluator<Window.Assign<T>>(){

            @Override
            public void evaluate(Window.Assign<T> transform, EvaluationContext context) {
                UnboundedDataset unboundedDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                JavaDStream dStream = unboundedDataset.getDStream();
                JavaDStream outputStream = TranslationUtils.skipAssignWindows(transform, context) ? dStream : dStream.transform((Function & Serializable)rdd -> rdd.map(new SparkAssignWindowFn(transform.getWindowFn())));
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(outputStream, unboundedDataset.getStreamSources()));
            }

            @Override
            public String toNativeString() {
                return "map(new <windowFn>())";
            }
        };
    }

    private static <K, V, W extends BoundedWindow> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
        return new TransformEvaluator<GroupByKey<K, V>>(){

            @Override
            public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
                UnboundedDataset inputDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                List<Integer> streamSources = inputDataset.getStreamSources();
                JavaDStream dStream = inputDataset.getDStream();
                KvCoder coder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                WindowingStrategy windowingStrategy = ((PCollection)context.getInput(transform)).getWindowingStrategy();
                WindowFn windowFn = windowingStrategy.getWindowFn();
                WindowedValue.FullWindowedValueCoder wvCoder = WindowedValue.FullWindowedValueCoder.of((Coder)coder.getValueCoder(), (Coder)windowFn.windowCoder());
                JavaDStream groupedByKeyStream = dStream.transform(arg_0 -> 6.lambda$evaluate$a006f305$1(coder, (WindowedValue.WindowedValueCoder)wvCoder, arg_0));
                JavaDStream outStream = SparkGroupAlsoByWindowViaWindowSet.groupAlsoByWindow(groupedByKeyStream, coder.getKeyCoder(), wvCoder, windowingStrategy, context.getSerializableOptions(), streamSources, context.getCurrentTransform().getFullName());
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(outStream, streamSources));
            }

            @Override
            public String toNativeString() {
                return "groupByKey()";
            }

            private static /* synthetic */ JavaRDD lambda$evaluate$a006f305$1(KvCoder coder, WindowedValue.WindowedValueCoder wvCoder, JavaRDD rdd) throws Exception {
                return GroupCombineFunctions.groupByKeyOnly(rdd, coder.getKeyCoder(), wvCoder, (Partitioner)new HashPartitioner(rdd.rdd().sparkContext().defaultParallelism()));
            }
        };
    }

    private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> combineGrouped() {
        return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform, EvaluationContext context) {
                PCollection input = (PCollection)context.getInput(transform);
                WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                CombineWithContext.CombineFnWithContext fn = CombineFnUtil.toFnWithContext((CombineFnBase.GlobalCombineFn)transform.getFn());
                UnboundedDataset unboundedDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                JavaDStream dStream = unboundedDataset.getDStream();
                SerializablePipelineOptions options = context.getSerializableOptions();
                SparkPCollectionView pviews = context.getPViews();
                JavaDStream outStream = dStream.transform((Function & Serializable)rdd -> {
                    SparkKeyedCombineFn combineFnWithContext = new SparkKeyedCombineFn(fn, options, TranslationUtils.getSideInputs(transform.getSideInputs(), new JavaSparkContext(rdd.context()), pviews), windowingStrategy);
                    return rdd.map(new TranslationUtils.CombineGroupedValues(combineFnWithContext));
                });
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(outStream, unboundedDataset.getStreamSources()));
            }

            @Override
            public String toNativeString() {
                return "map(new <fn>())";
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>> parDo() {
        return new TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>>(){

            @Override
            public void evaluate(ParDo.MultiOutput<InputT, OutputT> transform, EvaluationContext context) {
                DoFn doFn = transform.getFn();
                Preconditions.checkArgument((!DoFnSignatures.signatureForDoFn((DoFn)doFn).processElement().isSplittable() ? 1 : 0) != 0, (String)"Splittable DoFn not yet supported in streaming mode: %s", (Object)doFn);
                TranslationUtils.rejectStateAndTimers(doFn);
                SerializablePipelineOptions options = context.getSerializableOptions();
                SparkPCollectionView pviews = context.getPViews();
                WindowingStrategy windowingStrategy = ((PCollection)context.getInput(transform)).getWindowingStrategy();
                Coder inputCoder = ((PCollection)context.getInput(transform)).getCoder();
                Map<TupleTag<?>, Coder<?>> outputCoders = context.getOutputCoders();
                UnboundedDataset unboundedDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                JavaDStream dStream = unboundedDataset.getDStream();
                String stepName = context.getCurrentTransform().getFullName();
                JavaPairDStream all = dStream.transformToPair((Function & Serializable)rdd -> {
                    Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance();
                    Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), JavaSparkContext.fromSparkContext((SparkContext)rdd.context()), pviews);
                    return rdd.mapPartitionsToPair(new MultiDoFnFunction(metricsAccum, stepName, doFn, options, transform.getMainOutputTag(), transform.getAdditionalOutputTags().getAll(), inputCoder, outputCoders, sideInputs, windowingStrategy, false));
                });
                Map<TupleTag<?>, PValue> outputs = context.getOutputs((PTransform<?, ?>)transform);
                if (outputs.size() > 1) {
                    Map<TupleTag<?>, Coder<WindowedValue<?>>> coderMap = TranslationUtils.getTupleTagCoders(outputs);
                    all = all.mapToPair(TranslationUtils.getTupleTagEncodeFunction(coderMap)).cache().mapToPair(TranslationUtils.getTupleTagDecodeFunction(coderMap));
                }
                for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
                    JavaPairDStream filtered = all.filter(new TranslationUtils.TupleTagFilter(output.getKey()));
                    JavaDStream values = TranslationUtils.dStreamValues(filtered);
                    context.putDataset(output.getValue(), new UnboundedDataset(values, unboundedDataset.getStreamSources()));
                }
            }

            @Override
            public String toNativeString() {
                return "mapPartitions(new <fn>())";
            }
        };
    }

    private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K, V>> reshuffle() {
        return new TransformEvaluator<Reshuffle<K, V>>(){

            @Override
            public void evaluate(Reshuffle<K, V> transform, EvaluationContext context) {
                UnboundedDataset inputDataset = (UnboundedDataset)context.borrowDataset((PTransform<? extends PValue, ?>)transform);
                List<Integer> streamSources = inputDataset.getStreamSources();
                JavaDStream dStream = inputDataset.getDStream();
                KvCoder coder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                WindowingStrategy windowingStrategy = ((PCollection)context.getInput(transform)).getWindowingStrategy();
                WindowFn windowFn = windowingStrategy.getWindowFn();
                WindowedValue.FullWindowedValueCoder wvCoder = WindowedValue.FullWindowedValueCoder.of((Coder)coder.getValueCoder(), (Coder)windowFn.windowCoder());
                JavaDStream reshuffledStream = dStream.transform(arg_0 -> 9.lambda$evaluate$e69f4c4$1(coder, (WindowedValue.WindowedValueCoder)wvCoder, arg_0));
                context.putDataset((PTransform<?, ? extends PValue>)transform, (Dataset)new UnboundedDataset(reshuffledStream, streamSources));
            }

            @Override
            public String toNativeString() {
                return "repartition(...)";
            }

            private static /* synthetic */ JavaRDD lambda$evaluate$e69f4c4$1(KvCoder coder, WindowedValue.WindowedValueCoder wvCoder, JavaRDD rdd) throws Exception {
                return GroupCombineFunctions.reshuffle(rdd, coder.getKeyCoder(), wvCoder);
            }
        };
    }

    @Nullable
    private static TransformEvaluator<?> getTranslator(PTransform<?, ?> transform) {
        String urn = PTransformTranslation.urnForTransformOrNull(transform);
        return urn == null ? null : EVALUATORS.get(urn);
    }

    static {
        EVALUATORS.put(PTransformTranslation.READ_TRANSFORM_URN, StreamingTransformTranslator.readUnbounded());
        EVALUATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, StreamingTransformTranslator.groupByKey());
        EVALUATORS.put(PTransformTranslation.COMBINE_GROUPED_VALUES_TRANSFORM_URN, StreamingTransformTranslator.combineGrouped());
        EVALUATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, StreamingTransformTranslator.parDo());
        EVALUATORS.put("beam:transform:spark:consoleio_write_unbound:v1", StreamingTransformTranslator.print());
        EVALUATORS.put("beam:transform:spark:createstream:v1", StreamingTransformTranslator.createFromQueue());
        EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, StreamingTransformTranslator.window());
        EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, StreamingTransformTranslator.flattenPColl());
        EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, StreamingTransformTranslator.reshuffle());
    }

    private static class SparkCreateStreamPayloadTranslator
    extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<CreateStream<?>> {
        private SparkCreateStreamPayloadTranslator() {
        }

        public String getUrn(CreateStream<?> transform) {
            return "beam:transform:spark:createstream:v1";
        }
    }

    private static class SparkConsoleIOWriteUnboundedPayloadTranslator
    extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<ConsoleIO.Write.Unbound<?>> {
        private SparkConsoleIOWriteUnboundedPayloadTranslator() {
        }

        public String getUrn(ConsoleIO.Write.Unbound<?> transform) {
            return "beam:transform:spark:consoleio_write_unbound:v1";
        }
    }

    @AutoService(value=TransformPayloadTranslatorRegistrar.class)
    public static class SparkTransformsRegistrar
    implements TransformPayloadTranslatorRegistrar {
        public Map<? extends Class<? extends PTransform>, ? extends PTransformTranslation.TransformPayloadTranslator> getTransformPayloadTranslators() {
            return ImmutableMap.of(ConsoleIO.Write.Unbound.class, (Object)((Object)new SparkConsoleIOWriteUnboundedPayloadTranslator()), CreateStream.class, (Object)((Object)new SparkCreateStreamPayloadTranslator()));
        }
    }

    public static class Translator
    implements SparkPipelineTranslator {
        private final SparkPipelineTranslator batchTranslator;

        public Translator(SparkPipelineTranslator batchTranslator) {
            this.batchTranslator = batchTranslator;
        }

        @Override
        public boolean hasTranslation(PTransform<?, ?> transform) {
            return EVALUATORS.containsKey(PTransformTranslation.urnForTransformOrNull(transform));
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(PTransform<?, ?> transform) {
            TransformEvaluator transformEvaluator = this.batchTranslator.translateBounded(transform);
            Preconditions.checkState((transformEvaluator != null ? 1 : 0) != 0, (String)"No TransformEvaluator registered for BOUNDED transform %s", transform);
            return transformEvaluator;
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(PTransform<?, ?> transform) {
            TransformEvaluator transformEvaluator = StreamingTransformTranslator.getTranslator(transform);
            Preconditions.checkState((transformEvaluator != null ? 1 : 0) != 0, (String)"No TransformEvaluator registered for UNBOUNDED transform %s", transform);
            return transformEvaluator;
        }
    }
}

