/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.KeyValueGroupedDataset;
import org.joda.time.Instant;

class GroupByKeyTranslatorBatch<K, V>
implements TransformTranslator<PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
    GroupByKeyTranslatorBatch() {
    }

    @Override
    public void translateTransform(PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> transform, TranslationContext context) {
        PCollection inputPCollection = (PCollection)context.getInput();
        Dataset input = context.getDataset((PValue)inputPCollection);
        WindowingStrategy windowingStrategy = inputPCollection.getWindowingStrategy();
        KvCoder kvCoder = (KvCoder)inputPCollection.getCoder();
        Coder keyCoder = kvCoder.getKeyCoder();
        KeyValueGroupedDataset groupByKeyOnly = input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder));
        Coder valueCoder = kvCoder.getValueCoder();
        WindowedValue.FullWindowedValueCoder wvCoder = WindowedValue.FullWindowedValueCoder.of((Coder)valueCoder, (Coder)inputPCollection.getWindowingStrategy().getWindowFn().windowCoder());
        IterableCoder iterableCoder = IterableCoder.of((Coder)wvCoder);
        Dataset materialized = groupByKeyOnly.mapGroups((MapGroupsFunction & Serializable)(key, iterator) -> {
            ArrayList<WindowedValue> values = new ArrayList<WindowedValue>();
            while (iterator.hasNext()) {
                WindowedValue next = (WindowedValue)iterator.next();
                values.add(WindowedValue.of((Object)((KV)next.getValue()).getValue(), (Instant)next.getTimestamp(), (Collection)next.getWindows(), (PaneInfo)next.getPane()));
            }
            KV kv = KV.of((Object)key, (Object)Iterables.unmodifiableIterable(values));
            return kv;
        }, EncoderHelpers.fromBeamCoder(KvCoder.of((Coder)keyCoder, (Coder)iterableCoder)));
        WindowedValue.FullWindowedValueCoder outputCoder = WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of((Coder)keyCoder, (Coder)IterableCoder.of((Coder)valueCoder)), (Coder)windowingStrategy.getWindowFn().windowCoder());
        Dataset output = materialized.flatMap(new GroupAlsoByWindowViaOutputBufferFn(windowingStrategy, new InMemoryStateInternalsFactory(), SystemReduceFn.buffering((Coder)valueCoder), context.getSerializableOptions()), EncoderHelpers.fromBeamCoder(outputCoder));
        context.putDataset(context.getOutput(), output);
    }

    static class InMemoryStateInternalsFactory<K>
    implements StateInternalsFactory<K>,
    Serializable {
        InMemoryStateInternalsFactory() {
        }

        public StateInternals stateInternalsForKey(K key) {
            return InMemoryStateInternals.forKey(key);
        }
    }
}

