/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.window;

import com.antgroup.geaflow.api.function.base.AggregateFunction;
import com.antgroup.geaflow.api.function.base.KeySelector;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.base.window.AbstractOneInputOperator;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;

public class WindowAggregateOperator<KEY, IN, ACC, OUT>
extends AbstractOneInputOperator<IN, AggregateFunction<IN, ACC, OUT>> {
    private transient Map<KEY, ACC> aggregatingState;
    private KeySelector<IN, KEY> keySelector;

    public WindowAggregateOperator(AggregateFunction<IN, ACC, OUT> aggregateFunction, KeySelector<IN, KEY> keySelector) {
        super(aggregateFunction);
        this.keySelector = keySelector;
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        this.aggregatingState = new HashMap<KEY, ACC>();
    }

    @Override
    protected void process(IN value) throws Exception {
        Object key = this.keySelector.getKey(value);
        Object acc = this.aggregatingState.get(key);
        if (acc == null) {
            acc = ((AggregateFunction)this.function).createAccumulator();
        }
        ((AggregateFunction)this.function).add(value, acc);
        this.aggregatingState.put(key, acc);
    }

    @Override
    public void finish() {
        for (ACC acc : this.aggregatingState.values()) {
            Object result = ((AggregateFunction)this.function).getResult(acc);
            if (result == null) continue;
            this.collectValue(result);
        }
        this.aggregatingState.clear();
        super.finish();
    }

    @VisibleForTesting
    public void processValue(IN value) throws Exception {
        this.process(value);
    }
}

