/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.PeekingIterator;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;

public class FlinkNonMergingReduceFunction<K, InputT>
implements GroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, Iterable<InputT>>>> {
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final boolean reIterableResult;

    public FlinkNonMergingReduceFunction(WindowingStrategy<?, ?> windowingStrategy, boolean reIterableResult) {
        this.windowingStrategy = windowingStrategy;
        this.reIterableResult = reIterableResult;
    }

    public void reduce(Iterable<WindowedValue<KV<K, InputT>>> input, Collector<WindowedValue<KV<K, Iterable<InputT>>>> coll) {
        Iterable<Object> values;
        PeekingIterator iterator = Iterators.peekingIterator(input.iterator());
        WindowedValue first = (WindowedValue)iterator.peek();
        BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement((Iterable)first.getWindows());
        Instant outputTimestamp = first.getTimestamp();
        Instant combinedTimestamp = this.windowingStrategy.getTimestampCombiner().assign(window, outputTimestamp);
        if (this.reIterableResult) {
            ArrayList lst = new ArrayList();
            iterator.forEachRemaining(wv -> lst.add(((KV)wv.getValue()).getValue()));
            values = lst;
        } else {
            values = new OnceIterable(Iterators.transform((Iterator)iterator, wv -> ((KV)Objects.requireNonNull(wv).getValue()).getValue()));
        }
        coll.collect((Object)WindowedValue.of((Object)KV.of((Object)((KV)first.getValue()).getKey(), values), (Instant)combinedTimestamp, (Collection)first.getWindows(), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING));
    }

    private static class OnceIterable<T>
    implements Iterable<T> {
        private final Iterator<T> iterator;
        private final AtomicBoolean used = new AtomicBoolean(false);

        OnceIterable(Iterator<T> iterator) {
            this.iterator = iterator;
        }

        @Override
        public Iterator<T> iterator() {
            if (this.used.compareAndSet(false, true)) {
                return this.iterator;
            }
            throw new IllegalStateException("GBK result is not re-iterable. You can enable re-iterations by setting '--reIterableGroupByKeyResult'.");
        }
    }
}

