/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.window;

import com.antgroup.geaflow.api.function.base.KeySelector;
import com.antgroup.geaflow.api.function.base.ReduceFunction;
import com.antgroup.geaflow.operator.base.window.AbstractOneInputOperator;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;

public class WindowReduceOperator<KEY, T>
extends AbstractOneInputOperator<T, ReduceFunction<T>> {
    private final Map<KEY, T> valueState;
    private final KeySelector<T, KEY> keySelector;

    public WindowReduceOperator(ReduceFunction<T> function, KeySelector<T, KEY> keySelector) {
        super(function);
        this.keySelector = keySelector;
        this.valueState = new HashMap<KEY, T>();
    }

    @Override
    protected void process(T value) throws Exception {
        Object key = this.keySelector.getKey(value);
        T oldValue = this.valueState.get(key);
        Object newValue = oldValue == null ? value : ((ReduceFunction)this.function).reduce(oldValue, value);
        this.valueState.put(key, newValue);
    }

    @Override
    public void finish() {
        for (T value : this.valueState.values()) {
            if (value == null) continue;
            this.collectValue(value);
        }
        super.finish();
        this.valueState.clear();
    }

    @VisibleForTesting
    public void processValue(T value) throws Exception {
        this.process(value);
    }
}

