/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.testbench;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import java.util.HashMap;
import java.util.Map;

public class CompareFilterTuples<k>
extends BaseOperator {
    private Compare compareType = Compare.Equal;
    private int value;
    private Map<k, Integer> result;
    public final transient DefaultInputPort<Map<k, Integer>> inport = new DefaultInputPort<Map<k, Integer>>(){

        public void process(Map<k, Integer> map) {
            for (Map.Entry entry : map.entrySet()) {
                if (CompareFilterTuples.this.compareType == Compare.Equal && entry.getValue() == CompareFilterTuples.this.value) {
                    CompareFilterTuples.this.result.put(entry.getKey(), entry.getValue());
                }
                if (CompareFilterTuples.this.compareType == Compare.Greater && entry.getValue() > CompareFilterTuples.this.value) {
                    CompareFilterTuples.this.result.put(entry.getKey(), entry.getValue());
                }
                if (CompareFilterTuples.this.compareType != Compare.Smaller || entry.getValue() >= CompareFilterTuples.this.value) continue;
                CompareFilterTuples.this.result.put(entry.getKey(), entry.getValue());
            }
        }
    };
    public final transient DefaultOutputPort<Map<k, Integer>> outport = new DefaultOutputPort();
    public final transient DefaultOutputPort<Map<Integer, String>> redisport = new DefaultOutputPort();

    public void setCompareType(Compare type) {
        this.compareType = type;
    }

    public void setValue(int value) {
        this.value = value;
    }

    public void beginWindow(long windowId) {
        this.result = new HashMap<k, Integer>();
    }

    public void endWindow() {
        this.outport.emit(this.result);
        int numOuts = 1;
        Integer total = 0;
        for (Map.Entry<k, Integer> entry : this.result.entrySet()) {
            HashMap<Integer, String> tuple = new HashMap<Integer, String>();
            tuple.put(numOuts++, entry.getKey().toString());
            this.redisport.emit(tuple);
            total = total + entry.getValue();
        }
        HashMap<Integer, String> tuple = new HashMap<Integer, String>();
        tuple.put(numOuts++, total.toString());
        this.redisport.emit(tuple);
        tuple = new HashMap();
        tuple.put(0, new Integer(numOuts).toString());
        this.redisport.emit(tuple);
    }

    public static enum Compare {
        Smaller,
        Equal,
        Greater;

    }
}

