/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.math;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.KeyValPair;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableDouble;
import org.apache.commons.lang.mutable.MutableLong;

public class AverageKeyVal<K>
extends BaseNumberKeyValueOperator<K, Number> {
    protected HashMap<K, MutableDouble> sums = new HashMap();
    protected HashMap<K, MutableLong> counts = new HashMap();
    public final transient DefaultInputPort<KeyValPair<K, ? extends Number>> data = new DefaultInputPort<KeyValPair<K, ? extends Number>>(){

        public void process(KeyValPair<K, ? extends Number> tuple) {
            Object key = tuple.getKey();
            if (!AverageKeyVal.this.doprocessKey(key)) {
                return;
            }
            MutableDouble val = AverageKeyVal.this.sums.get(key);
            if (val == null) {
                val = new MutableDouble(((Number)tuple.getValue()).doubleValue());
            } else {
                val.add(((Number)tuple.getValue()).doubleValue());
            }
            AverageKeyVal.this.sums.put(AverageKeyVal.this.cloneKey(key), val);
            MutableLong count = AverageKeyVal.this.counts.get(key);
            if (count == null) {
                count = new MutableLong(0L);
                AverageKeyVal.this.counts.put(AverageKeyVal.this.cloneKey(key), count);
            }
            count.increment();
        }
    };
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleAverage = new DefaultOutputPort();
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<KeyValPair<K, Integer>> intAverage = new DefaultOutputPort();
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<KeyValPair<K, Long>> longAverage = new DefaultOutputPort();

    public void endWindow() {
        for (Map.Entry<K, MutableDouble> e : this.sums.entrySet()) {
            K key = e.getKey();
            double d = e.getValue().doubleValue();
            if (this.doubleAverage.isConnected()) {
                this.doubleAverage.emit(new KeyValPair<K, Double>(key, d / this.counts.get(key).doubleValue()));
            }
            if (this.intAverage.isConnected()) {
                this.intAverage.emit(new KeyValPair<K, Integer>(key, (int)d));
            }
            if (!this.longAverage.isConnected()) continue;
            this.longAverage.emit(new KeyValPair<K, Long>(key, (long)d));
        }
        this.sums.clear();
        this.counts.clear();
    }
}

