/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.BatchTimerInternals;
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn;
import com.google.cloud.dataflow.sdk.util.ReduceFnRunner;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.state.StateInternals;
import com.google.cloud.dataflow.sdk.values.KV;
import org.joda.time.Instant;

@SystemDoFnInternal
public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
    private final WindowingStrategy<?, W> strategy;
    private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;

    public GroupAlsoByWindowsViaOutputBufferDoFn(WindowingStrategy<?, W> windowingStrategy, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
        this.strategy = windowingStrategy;
        this.reduceFn = reduceFn;
    }

    @Override
    public void processElement(DoFn.ProcessContext c) throws Exception {
        Object key = ((KV)c.element()).getKey();
        BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now());
        StateInternals<?> stateInternals = c.windowingInternals().stateInternals();
        ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.strategy, stateInternals, timerInternals, c.windowingInternals(), this.droppedDueToClosedWindow, this.reduceFn, c.getPipelineOptions());
        Iterable chunks = Iterables.partition((Iterable)((KV)c.element()).getValue(), 1000);
        for (Iterable iterable : chunks) {
            reduceFnRunner.processElements(iterable);
            timerInternals.advanceInputWatermark(reduceFnRunner, ((WindowedValue)iterable.iterator().next()).getTimestamp());
            timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
        }
        timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
        timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
        reduceFnRunner.persist();
    }
}

