/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.testbench;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import java.util.HashMap;
import java.util.Map;
import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThroughputCounter<K, V extends Number>
extends BaseOperator {
    private static Logger log = LoggerFactory.getLogger(ThroughputCounter.class);
    public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>(){

        public void process(HashMap<K, V> tuple) {
            for (Map.Entry e : tuple.entrySet()) {
                ThroughputCounter.this.tuple_count += ((Number)e.getValue()).longValue();
            }
        }
    };
    public final transient DefaultOutputPort<HashMap<String, Number>> count = new DefaultOutputPort();
    public static final String OPORT_COUNT_TUPLE_AVERAGE = "avg";
    public static final String OPORT_COUNT_TUPLE_COUNT = "count";
    public static final String OPORT_COUNT_TUPLE_TIME = "window_time";
    public static final String OPORT_COUNT_TUPLE_TUPLES_PERSEC = "tuples_per_sec";
    public static final String OPORT_COUNT_TUPLE_WINDOWID = "window_id";
    private long windowStartTime = 0L;
    @Min(value=1L)
    private int rolling_window_count = 1;
    long[] tuple_numbers = null;
    long[] time_numbers = null;
    int tuple_index = 0;
    int count_denominator = 1;
    long count_windowid = 0L;
    long tuple_count = 1L;
    boolean didemit = false;

    @Min(value=1L)
    public int getRollingWindowCount() {
        return this.rolling_window_count;
    }

    public void setRollingWindowCount(int i) {
        this.rolling_window_count = i;
    }

    public void setup(Context.OperatorContext context) {
        this.windowStartTime = System.currentTimeMillis();
        log.debug(String.format("\nTupleCounter: set window to %d", this.rolling_window_count));
        if (this.rolling_window_count != 1) {
            this.tuple_numbers = new long[this.rolling_window_count];
            this.time_numbers = new long[this.rolling_window_count];
            for (int i = this.tuple_numbers.length; i > 0; --i) {
                this.tuple_numbers[i - 1] = 0L;
                this.time_numbers[i - 1] = 0L;
            }
            this.tuple_index = 0;
        }
    }

    public void beginWindow(long windowId) {
        if (this.tuple_count != 0L) {
            this.windowStartTime = System.currentTimeMillis();
            if (this.didemit) {
                this.tuple_count = 0L;
            }
        }
    }

    public void endWindow() {
        long average;
        if (this.tuple_count == 0L) {
            return;
        }
        long elapsedTime = System.currentTimeMillis() - this.windowStartTime;
        if (elapsedTime == 0L) {
            this.didemit = false;
            return;
        }
        long tuples_per_sec = this.tuple_count * 1000L / elapsedTime;
        if (this.rolling_window_count == 1) {
            average = tuples_per_sec;
        } else {
            long slots;
            if (this.count_denominator == this.rolling_window_count) {
                this.tuple_numbers[this.tuple_index] = this.tuple_count;
                this.time_numbers[this.tuple_index] = elapsedTime;
                slots = this.rolling_window_count;
                ++this.tuple_index;
                if (this.tuple_index == this.rolling_window_count) {
                    this.tuple_index = 0;
                }
            } else {
                this.tuple_numbers[this.count_denominator - 1] = this.tuple_count;
                this.time_numbers[this.count_denominator - 1] = elapsedTime;
                slots = this.count_denominator;
                ++this.count_denominator;
            }
            long time_slot = 0L;
            long numtuples = 0L;
            int i = 0;
            while ((long)i < slots) {
                numtuples += this.tuple_numbers[i];
                time_slot += this.time_numbers[i];
                ++i;
            }
            average = numtuples * 1000L / time_slot;
        }
        HashMap<String, Long> tuples = new HashMap<String, Long>();
        tuples.put(OPORT_COUNT_TUPLE_AVERAGE, new Long(average));
        tuples.put(OPORT_COUNT_TUPLE_COUNT, new Long(this.tuple_count));
        tuples.put(OPORT_COUNT_TUPLE_TIME, new Long(elapsedTime));
        tuples.put(OPORT_COUNT_TUPLE_TUPLES_PERSEC, new Long(tuples_per_sec));
        tuples.put(OPORT_COUNT_TUPLE_WINDOWID, new Long(this.count_windowid++));
        this.count.emit(tuples);
        this.didemit = true;
    }
}

