/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.transforms;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
import com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.joda.time.ReadableInstant;

public class GroupByKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
    private final boolean fewKeys;

    private GroupByKey(boolean fewKeys) {
        this.fewKeys = fewKeys;
    }

    public static <K, V> GroupByKey<K, V> create() {
        return new GroupByKey<K, V>(false);
    }

    static <K, V> GroupByKey<K, V> create(boolean fewKeys) {
        return new GroupByKey<K, V>(fewKeys);
    }

    public boolean fewKeys() {
        return this.fewKeys;
    }

    public static void applicableTo(PCollection<?> input) {
        WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
        if (windowingStrategy.getWindowFn() instanceof GlobalWindows && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger && input.isBounded() != PCollection.IsBounded.BOUNDED) {
            throw new IllegalStateException("GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
        }
        if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
            String cause = ((InvalidWindows)windowingStrategy.getWindowFn()).getCause();
            throw new IllegalStateException("GroupByKey must have a valid Window merge function.  Invalid because: " + cause);
        }
    }

    @Override
    public void validate(PCollection<KV<K, V>> input) {
        GroupByKey.applicableTo(input);
        Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
        try {
            keyCoder.verifyDeterministic();
        }
        catch (Coder.NonDeterministicException e) {
            throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e);
        }
    }

    public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
        WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();
        if (!inputWindowFn.isNonMerging()) {
            inputWindowFn = new InvalidWindows("WindowFn has already been consumed by previous GroupByKey", inputWindowFn);
        }
        return inputStrategy.withWindowFn(inputWindowFn).withTrigger(inputStrategy.getTrigger().getSpec().getContinuationTrigger());
    }

    @Override
    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
        WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
        return ((PCollection)((PCollection)((PCollection)((PCollection)input.apply(new ReifyTimestampsAndWindows())).apply(new GroupByKeyOnly())).apply(new SortValuesByTimestamp())).apply(new GroupAlsoByWindow(windowingStrategy))).setWindowingStrategyInternal(this.updateWindowingStrategy(windowingStrategy));
    }

    @Override
    protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
        return GroupByKey.getOutputKvCoder(input.getCoder());
    }

    static <K, V> KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
        if (!(inputCoder instanceof KvCoder)) {
            throw new IllegalStateException("GroupByKey requires its input to use KvCoder");
        }
        return (KvCoder)inputCoder;
    }

    static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
        return GroupByKey.getInputKvCoder(inputCoder).getKeyCoder();
    }

    public static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) {
        return GroupByKey.getInputKvCoder(inputCoder).getValueCoder();
    }

    static <K, V> Coder<Iterable<V>> getOutputValueCoder(Coder<KV<K, V>> inputCoder) {
        return IterableCoder.of(GroupByKey.getInputValueCoder(inputCoder));
    }

    static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
        return KvCoder.of(GroupByKey.getKeyCoder(inputCoder), GroupByKey.getOutputValueCoder(inputCoder));
    }

    @Override
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        if (this.fewKeys) {
            builder.add(DisplayData.item("fewKeys", true).withLabel("Has Few Keys"));
        }
    }

    private static <K, V> void registerWithDirectPipelineRunner() {
        DirectPipelineRunner.registerDefaultTransformEvaluator(GroupByKeyOnly.class, new DirectPipelineRunner.TransformEvaluator<GroupByKeyOnly>(){

            @Override
            public void evaluate(GroupByKeyOnly transform, DirectPipelineRunner.EvaluationContext context) {
                GroupByKey.evaluateHelper(transform, context);
            }
        });
    }

    private static <K, V> void evaluateHelper(GroupByKeyOnly<K, V> transform, DirectPipelineRunner.EvaluationContext context) {
        PCollection input = (PCollection)context.getInput(transform);
        List inputElems = context.getPCollectionValuesWithMetadata(input);
        Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
        HashMap groupingMap = new HashMap();
        for (DirectPipelineRunner.ValueWithMetadata elem : inputElems) {
            byte[] encodedKey;
            Object key = ((KV)elem.getValue()).getKey();
            Object value = ((KV)elem.getValue()).getValue();
            try {
                encodedKey = CoderUtils.encodeToByteArray(keyCoder, key);
            }
            catch (CoderException exn) {
                throw new IllegalArgumentException("unable to encode key " + key + " of input to " + transform + " using " + keyCoder, exn);
            }
            GroupingKey groupingKey = new GroupingKey(key, encodedKey);
            ArrayList values = (ArrayList)groupingMap.get(groupingKey);
            if (values == null) {
                values = new ArrayList();
                groupingMap.put(groupingKey, values);
            }
            values.add(value);
        }
        ArrayList outputElems = new ArrayList();
        for (Map.Entry entry : groupingMap.entrySet()) {
            GroupingKey groupingKey = (GroupingKey)entry.getKey();
            Object key = groupingKey.getKey();
            List values = (List)entry.getValue();
            values = context.randomizeIfUnordered(values, true);
            outputElems.add(DirectPipelineRunner.ValueWithMetadata.of(WindowedValue.valueInEmptyWindows(KV.of(key, values))).withKey(key));
        }
        context.setPCollectionValuesWithMetadata((PCollection)context.getOutput(transform), outputElems);
    }

    static {
        GroupByKey.registerWithDirectPipelineRunner();
    }

    private static class GroupingKey<K> {
        private K key;
        private byte[] encodedKey;

        public GroupingKey(K key, byte[] encodedKey) {
            this.key = key;
            this.encodedKey = encodedKey;
        }

        public K getKey() {
            return this.key;
        }

        public boolean equals(Object o) {
            if (o instanceof GroupingKey) {
                GroupingKey that = (GroupingKey)o;
                return Arrays.equals(this.encodedKey, that.encodedKey);
            }
            return false;
        }

        public int hashCode() {
            return Arrays.hashCode(this.encodedKey);
        }
    }

    public static class GroupByKeyOnly<K, V>
    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
        @Override
        public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
            return PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
        }

        KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
            if (!(inputCoder instanceof KvCoder)) {
                throw new IllegalStateException("GroupByKey requires its input to use KvCoder");
            }
            return (KvCoder)inputCoder;
        }

        @Override
        protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
            return GroupByKey.getOutputKvCoder(input.getCoder());
        }
    }

    public static class GroupAlsoByWindow<K, V>
    extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
        private final WindowingStrategy<?, ?> windowingStrategy;

        public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
            this.windowingStrategy = windowingStrategy;
        }

        @Override
        public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
            KvCoder inputKvCoder = (KvCoder)input.getCoder();
            Coder keyCoder = inputKvCoder.getKeyCoder();
            Coder inputValueCoder = inputKvCoder.getValueCoder();
            IterableCoder inputIterableValueCoder = (IterableCoder)inputValueCoder;
            Coder inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
            WindowedValue.WindowedValueCoder inputIterableWindowedValueCoder = (WindowedValue.WindowedValueCoder)inputIterableElementCoder;
            Coder inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
            IterableCoder outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
            KvCoder outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
            return ((PCollection)input.apply(ParDo.of(this.groupAlsoByWindowsFn(this.windowingStrategy, inputIterableElementValueCoder)))).setCoder((Coder)outputKvCoder);
        }

        private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
            return new GroupAlsoByWindowsViaOutputBufferDoFn(strategy, SystemReduceFn.buffering(inputIterableElementValueCoder));
        }
    }

    public static class SortValuesByTimestamp<K, V>
    extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
        @Override
        public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
            return ((PCollection)input.apply(ParDo.of(new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>(){

                @Override
                public void processElement(DoFn.ProcessContext c) {
                    KV kvs = (KV)c.element();
                    Object key = kvs.getKey();
                    Iterable unsortedValues = (Iterable)kvs.getValue();
                    ArrayList<WindowedValue> sortedValues = new ArrayList<WindowedValue>();
                    for (WindowedValue value : unsortedValues) {
                        sortedValues.add(value);
                    }
                    Collections.sort(sortedValues, new Comparator<WindowedValue<V>>(){

                        @Override
                        public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
                            return e1.getTimestamp().compareTo((ReadableInstant)e2.getTimestamp());
                        }
                    });
                    c.output(KV.of(key, sortedValues));
                }
            }))).setCoder((Coder)input.getCoder());
        }
    }

    public static class ReifyTimestampsAndWindows<K, V>
    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
        @Override
        public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {
            KvCoder inputKvCoder = (KvCoder)input.getCoder();
            Coder keyCoder = inputKvCoder.getKeyCoder();
            Coder inputValueCoder = inputKvCoder.getValueCoder();
            WindowedValue.FullWindowedValueCoder outputValueCoder = WindowedValue.FullWindowedValueCoder.of(inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
            KvCoder outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
            return ((PCollection)input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn()))).setCoder((Coder)outputKvCoder);
        }
    }
}

