/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.translation.SparkAbstractCombineFn;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.spark.repackaged.com.google.common.base.Function;
import org.apache.beam.spark.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.spark.repackaged.com.google.common.collect.Lists;
import org.joda.time.Instant;

public class SparkGlobalCombineFn<InputT, AccumT, OutputT>
extends SparkAbstractCombineFn {
    private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;

    public SparkGlobalCombineFn(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) {
        super(runtimeContext, sideInputs, windowingStrategy);
        this.combineFn = combineFn;
    }

    Iterable<WindowedValue<AccumT>> zeroValue() {
        return Lists.newArrayList();
    }

    private Iterable<WindowedValue<AccumT>> createAccumulator(WindowedValue<InputT> input) {
        boolean merging;
        Iterable sortedInputs = SparkGlobalCombineFn.sortByWindows(input.explodeWindows());
        OutputTimeFn outputTimeFn = this.windowingStrategy.getOutputTimeFn();
        Iterator iterator = sortedInputs.iterator();
        WindowedValue currentInput = iterator.next();
        BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null);
        Object accumulator = this.combineFn.createAccumulator((CombineWithContext.Context)this.ctxtForInput(currentInput));
        accumulator = this.combineFn.addInput(accumulator, currentInput.getValue(), (CombineWithContext.Context)this.ctxtForInput(currentInput));
        Instant windowTimestamp = outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
        ArrayList<WindowedValue<AccumT>> output = Lists.newArrayList();
        boolean bl = merging = !this.windowingStrategy.getWindowFn().isNonMerging();
        while (iterator.hasNext()) {
            boolean mergingAndIntersecting;
            WindowedValue nextValue = iterator.next();
            BoundedWindow nextWindow = (BoundedWindow)Iterables.getOnlyElement(nextValue.getWindows());
            boolean bl2 = mergingAndIntersecting = merging && SparkGlobalCombineFn.isIntersecting((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
            if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
                if (mergingAndIntersecting) {
                    currentWindow = SparkGlobalCombineFn.merge((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
                }
                accumulator = this.combineFn.addInput(accumulator, nextValue.getValue(), (CombineWithContext.Context)this.ctxtForInput(nextValue));
                windowTimestamp = outputTimeFn.combine(windowTimestamp, outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
                continue;
            }
            output.add(WindowedValue.of((Object)accumulator, (Instant)windowTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING));
            accumulator = this.combineFn.createAccumulator((CombineWithContext.Context)this.ctxtForInput(nextValue));
            accumulator = this.combineFn.addInput(accumulator, nextValue.getValue(), (CombineWithContext.Context)this.ctxtForInput(nextValue));
            currentWindow = nextWindow;
            windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
        }
        output.add(WindowedValue.of((Object)accumulator, (Instant)windowTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING));
        return output;
    }

    Iterable<WindowedValue<AccumT>> seqOp(Iterable<WindowedValue<AccumT>> accum, WindowedValue<InputT> input) {
        return this.combOp(accum, this.createAccumulator(input));
    }

    Iterable<WindowedValue<AccumT>> combOp(Iterable<WindowedValue<AccumT>> a1, Iterable<WindowedValue<AccumT>> a2) {
        boolean merging;
        Iterable accumulators = Iterables.concat(a1, a2);
        if (!accumulators.iterator().hasNext()) {
            return Lists.newArrayList();
        }
        Iterable sortedAccumulators = SparkGlobalCombineFn.sortByWindows(accumulators);
        OutputTimeFn outputTimeFn = this.windowingStrategy.getOutputTimeFn();
        Iterator iterator = sortedAccumulators.iterator();
        WindowedValue currentValue = iterator.next();
        BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
        ArrayList<Object> currentWindowAccumulators = Lists.newArrayList();
        currentWindowAccumulators.add(currentValue.getValue());
        ArrayList<Instant> windowTimestamps = Lists.newArrayList();
        windowTimestamps.add(currentValue.getTimestamp());
        ArrayList<WindowedValue<AccumT>> output = Lists.newArrayList();
        boolean bl = merging = !this.windowingStrategy.getWindowFn().isNonMerging();
        while (iterator.hasNext()) {
            boolean mergingAndIntersecting;
            WindowedValue nextValue = iterator.next();
            BoundedWindow nextWindow = (BoundedWindow)Iterables.getOnlyElement(nextValue.getWindows());
            boolean bl2 = mergingAndIntersecting = merging && SparkGlobalCombineFn.isIntersecting((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
            if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
                if (mergingAndIntersecting) {
                    currentWindow = SparkGlobalCombineFn.merge((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
                }
                currentWindowAccumulators.add(nextValue.getValue());
                windowTimestamps.add(nextValue.getTimestamp());
                continue;
            }
            Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
            Iterable accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
            WindowedValue preMergeWindowedValue = WindowedValue.of(accumsToMerge, (Instant)mergedTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING);
            Object accumulated = this.combineFn.mergeAccumulators(accumsToMerge, (CombineWithContext.Context)this.ctxtForInput(preMergeWindowedValue));
            WindowedValue postMergeWindowedValue = preMergeWindowedValue.withValue(accumulated);
            output.add(postMergeWindowedValue);
            currentWindowAccumulators.clear();
            currentWindowAccumulators.add(nextValue.getValue());
            currentWindow = nextWindow;
            windowTimestamps.clear();
            windowTimestamps.add(nextValue.getTimestamp());
        }
        Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
        Iterable accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
        WindowedValue preMergeWindowedValue = WindowedValue.of(accumsToMerge, (Instant)mergedTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING);
        Object accumulated = this.combineFn.mergeAccumulators(accumsToMerge, (CombineWithContext.Context)this.ctxtForInput(preMergeWindowedValue));
        WindowedValue postMergeWindowedValue = preMergeWindowedValue.withValue(accumulated);
        output.add(postMergeWindowedValue);
        return output;
    }

    Iterable<WindowedValue<OutputT>> extractOutput(Iterable<WindowedValue<AccumT>> wvas) {
        return Iterables.transform(wvas, new Function<WindowedValue<AccumT>, WindowedValue<OutputT>>(){

            @Override
            @Nullable
            public WindowedValue<OutputT> apply(@Nullable WindowedValue<AccumT> wva) {
                if (wva == null) {
                    return null;
                }
                return wva.withValue(SparkGlobalCombineFn.this.combineFn.extractOutput(wva.getValue(), (CombineWithContext.Context)SparkGlobalCombineFn.this.ctxtForInput(wva)));
            }
        });
    }
}

