/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.math;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.KeyValPair;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableDouble;

public class SumKeyVal<K, V extends Number>
extends BaseNumberKeyValueOperator<K, V> {
    protected HashMap<K, SumEntry> sums = new HashMap();
    protected boolean cumulative = false;
    public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>(){

        public void process(KeyValPair<K, V> tuple) {
            Object key = tuple.getKey();
            if (!SumKeyVal.this.doprocessKey(key)) {
                return;
            }
            SumEntry val = SumKeyVal.this.sums.get(key);
            if (val == null) {
                val = new SumEntry(new MutableDouble(((Number)tuple.getValue()).doubleValue()), true);
            } else {
                val.sum.add(((Number)tuple.getValue()).doubleValue());
                val.changed = true;
            }
            SumKeyVal.this.sums.put(SumKeyVal.this.cloneKey(key), val);
        }

        public StreamCodec<KeyValPair<K, V>> getStreamCodec() {
            return SumKeyVal.this.getKeyValPairStreamCodec();
        }
    };
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<KeyValPair<K, V>> sum = new DefaultOutputPort();
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<KeyValPair<K, Double>> sumDouble = new DefaultOutputPort();
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<KeyValPair<K, Integer>> sumInteger = new DefaultOutputPort();
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<KeyValPair<K, Long>> sumLong = new DefaultOutputPort();
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<KeyValPair<K, Short>> sumShort = new DefaultOutputPort();
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<KeyValPair<K, Float>> sumFloat = new DefaultOutputPort();

    public boolean isCumulative() {
        return this.cumulative;
    }

    public void setCumulative(boolean cumulative) {
        this.cumulative = cumulative;
    }

    public void endWindow() {
        for (Map.Entry<K, SumEntry> e : this.sums.entrySet()) {
            K key = e.getKey();
            SumEntry val = e.getValue();
            if (!val.changed) continue;
            this.sum.emit(new KeyValPair(key, this.getValue(val.sum.doubleValue())));
            this.sumDouble.emit(new KeyValPair<K, Double>(key, val.sum.doubleValue()));
            this.sumInteger.emit(new KeyValPair<K, Integer>(key, val.sum.intValue()));
            this.sumFloat.emit(new KeyValPair<K, Float>(key, Float.valueOf(val.sum.floatValue())));
            this.sumShort.emit(new KeyValPair<K, Short>(key, val.sum.shortValue()));
            this.sumLong.emit(new KeyValPair<K, Long>(key, val.sum.longValue()));
        }
        this.clearCache();
    }

    public void clearCache() {
        if (this.cumulative) {
            for (Map.Entry<K, SumEntry> e : this.sums.entrySet()) {
                SumEntry val = e.getValue();
                val.changed = false;
            }
        } else {
            this.sums.clear();
        }
    }

    protected static class SumEntry {
        public MutableDouble sum;
        public boolean changed = true;

        SumEntry() {
        }

        SumEntry(MutableDouble sum, boolean changed) {
            this.sum = sum;
            this.changed = changed;
        }
    }
}

