/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.streamlet.impl.operators;

import java.util.HashMap;
import java.util.Map;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.TupleWindow;
import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.Window;
import org.apache.heron.streamlet.impl.operators.StreamletWindowOperator;

public class GeneralReduceByKeyAndWindowOperator<R, K, T>
extends StreamletWindowOperator<R, KeyValue<KeyedWindow<K>, T>> {
    private SerializableFunction<R, K> keyExtractor;
    private T identity;
    private SerializableBiFunction<T, R, ? extends T> reduceFn;

    public GeneralReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor, T identity, SerializableBiFunction<T, R, ? extends T> reduceFn) {
        this.keyExtractor = keyExtractor;
        this.identity = identity;
        this.reduceFn = reduceFn;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        HashMap reduceMap = new HashMap();
        HashMap windowCountMap = new HashMap();
        for (Tuple tuple : inputWindow.get()) {
            Object tup = tuple.getValue(0);
            this.addMap(reduceMap, windowCountMap, tup);
        }
        long startWindow = inputWindow.getStartTimestamp() == null ? 0L : inputWindow.getStartTimestamp();
        long endWindow = inputWindow.getEndTimestamp() == null ? 0L : inputWindow.getEndTimestamp();
        for (Object key : reduceMap.keySet()) {
            Window window = new Window(startWindow, endWindow, ((Integer)windowCountMap.get(key)).intValue());
            KeyedWindow keyedWindow = new KeyedWindow(key, window);
            this.collector.emit(new Values(new KeyValue(keyedWindow, reduceMap.get(key))));
        }
    }

    private void addMap(Map<K, T> reduceMap, Map<K, Integer> windowCountMap, R tup) {
        Object key = this.keyExtractor.apply(tup);
        if (!reduceMap.containsKey(key)) {
            reduceMap.put(key, this.identity);
            windowCountMap.put(key, 0);
        }
        reduceMap.put(key, this.reduceFn.apply(reduceMap.get(key), tup));
        windowCountMap.put(key, windowCountMap.get(key) + 1);
    }
}

