/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow;

import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.IdentityWindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;

class ReshuffleOverrideFactory<K, V>
extends SingleInputOutputOverrideFactory<PCollection<KV<K, V>>, PCollection<KV<K, V>>, Reshuffle<K, V>> {
    ReshuffleOverrideFactory() {
    }

    public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, V>>> getReplacementTransform(AppliedPTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>, Reshuffle<K, V>> transform) {
        return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), new ReshuffleWithOnlyTrigger());
    }

    private static class ReshuffleWithOnlyTrigger<K, V>
    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
        private ReshuffleWithOnlyTrigger() {
        }

        public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
            WindowingStrategy originalStrategy = input.getWindowingStrategy();
            Window rewindow = Window.into((WindowFn)new IdentityWindowFn(originalStrategy.getWindowFn().windowCoder())).triggering((Trigger)new ReshuffleTrigger()).discardingFiredPanes().withAllowedLateness(Duration.millis((long)BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
            return (PCollection)((PCollection)((PCollection)input.apply((PTransform)rewindow)).apply((PTransform)GroupByKey.create())).setWindowingStrategyInternal(originalStrategy).apply("ExpandIterable", (PTransform)ParDo.of((DoFn)new DoFn<KV<K, Iterable<V>>, KV<K, V>>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    Object key = ((KV)c.element()).getKey();
                    for (Object value : (Iterable)((KV)c.element()).getValue()) {
                        c.output((Object)KV.of((Object)key, value));
                    }
                }
            }));
        }
    }
}

