/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.math;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.KeyValPair;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableDouble;

public class MarginKeyVal<K, V extends Number>
extends BaseNumberKeyValueOperator<K, V> {
    public final transient DefaultInputPort<KeyValPair<K, V>> numerator = new DefaultInputPort<KeyValPair<K, V>>(){

        public void process(KeyValPair<K, V> tuple) {
            MarginKeyVal.this.addTuple(tuple, MarginKeyVal.this.numerators);
        }

        public StreamCodec<KeyValPair<K, V>> getStreamCodec() {
            return MarginKeyVal.this.getKeyValPairStreamCodec();
        }
    };
    public final transient DefaultInputPort<KeyValPair<K, V>> denominator = new DefaultInputPort<KeyValPair<K, V>>(){

        public void process(KeyValPair<K, V> tuple) {
            MarginKeyVal.this.addTuple(tuple, MarginKeyVal.this.denominators);
        }

        public StreamCodec<KeyValPair<K, V>> getStreamCodec() {
            return MarginKeyVal.this.getKeyValPairStreamCodec();
        }
    };
    public final transient DefaultOutputPort<KeyValPair<K, V>> margin = new DefaultOutputPort();
    protected HashMap<K, MutableDouble> numerators = new HashMap();
    protected HashMap<K, MutableDouble> denominators = new HashMap();
    protected boolean percent = false;

    public void addTuple(KeyValPair<K, V> tuple, Map<K, MutableDouble> map) {
        Object key = tuple.getKey();
        if (!this.doprocessKey(key) || tuple.getValue() == null) {
            return;
        }
        MutableDouble val = map.get(key);
        if (val == null) {
            val = new MutableDouble(0.0);
            map.put(this.cloneKey(key), val);
        }
        val.add(((Number)tuple.getValue()).doubleValue());
    }

    public boolean getPercent() {
        return this.percent;
    }

    public void setPercent(boolean val) {
        this.percent = val;
    }

    public void endWindow() {
        for (Map.Entry<K, MutableDouble> e : this.denominators.entrySet()) {
            K key = e.getKey();
            MutableDouble nval = this.numerators.get(key);
            if (nval == null) {
                nval = new MutableDouble(0.0);
            } else {
                this.numerators.remove(key);
            }
            Double val = this.percent ? Double.valueOf((1.0 - nval.doubleValue() / e.getValue().doubleValue()) * 100.0) : Double.valueOf(1.0 - nval.doubleValue() / e.getValue().doubleValue());
            this.margin.emit(new KeyValPair(key, this.getValue((double)val)));
        }
        this.numerators.clear();
        this.denominators.clear();
    }
}

