/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.algo;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.KeyValPair;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Stateless
@OperatorAnnotation(partitionable=true)
public class UniqueValueCount<K>
extends BaseOperator {
    private final Map<K, Set<Object>> interimUniqueValues;
    public transient DefaultInputPort<KeyValPair<K, Object>> input = new DefaultInputPort<KeyValPair<K, Object>>(){

        public void process(KeyValPair<K, Object> pair) {
            Set values = (Set)UniqueValueCount.this.interimUniqueValues.get(pair.getKey());
            if (values == null) {
                values = Sets.newHashSet();
                UniqueValueCount.this.interimUniqueValues.put(pair.getKey(), values);
            }
            values.add(pair.getValue());
        }
    };
    public transient DefaultOutputPort<KeyValPair<K, Integer>> output = new DefaultOutputPort<KeyValPair<K, Integer>>(){

        public Operator.Unifier<KeyValPair<K, Integer>> getUnifier() {
            return new UniqueCountUnifier();
        }
    };
    public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> outputValues = new DefaultOutputPort<KeyValPair<K, Set<Object>>>(){

        public Operator.Unifier<KeyValPair<K, Set<Object>>> getUnifier() {
            return new UniqueCountSetUnifier();
        }
    };

    public UniqueValueCount() {
        this.interimUniqueValues = Maps.newHashMap();
    }

    public void endWindow() {
        for (K key : this.interimUniqueValues.keySet()) {
            Set<Object> values = this.interimUniqueValues.get(key);
            if (this.output.isConnected()) {
                this.output.emit(new InternalCountOutput<K>(key, values.size(), values));
            }
            if (!this.outputValues.isConnected()) continue;
            this.outputValues.emit(new KeyValPair<K, Set<Object>>(key, values));
        }
        this.interimUniqueValues.clear();
    }

    static class UniqueCountUnifier<K>
    implements Operator.Unifier<InternalCountOutput<K>> {
        public final transient DefaultOutputPort<InternalCountOutput<K>> output = new DefaultOutputPort();
        private final Map<K, Set<Object>> finalUniqueValues = Maps.newHashMap();

        public void process(InternalCountOutput<K> tuple) {
            HashSet values = this.finalUniqueValues.get(tuple.getKey());
            if (values == null) {
                values = Sets.newHashSet();
                this.finalUniqueValues.put(tuple.getKey(), values);
            }
            values.addAll(((InternalCountOutput)tuple).interimUniqueValues);
        }

        public void beginWindow(long l) {
        }

        public void endWindow() {
            for (K key : this.finalUniqueValues.keySet()) {
                this.output.emit(new InternalCountOutput<K>(key, this.finalUniqueValues.get(key).size(), this.finalUniqueValues.get(key)));
            }
            this.finalUniqueValues.clear();
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }
    }

    static class UniqueCountSetUnifier<K>
    implements Operator.Unifier<KeyValPair<K, Set<Object>>> {
        public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> output = new DefaultOutputPort();
        private final Map<K, Set<Object>> finalUniqueValues = Maps.newHashMap();

        public void process(KeyValPair<K, Set<Object>> tuple) {
            HashSet values = this.finalUniqueValues.get(tuple.getKey());
            if (values == null) {
                values = Sets.newHashSet();
                this.finalUniqueValues.put(tuple.getKey(), values);
            }
            values.addAll((Collection)tuple.getValue());
        }

        public void beginWindow(long l) {
        }

        public void endWindow() {
            for (Map.Entry<K, Set<Object>> entry : this.finalUniqueValues.entrySet()) {
                this.output.emit(new KeyValPair<K, Set<Object>>(entry.getKey(), entry.getValue()));
            }
            this.finalUniqueValues.clear();
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }
    }

    public static class InternalCountOutput<K>
    extends KeyValPair<K, Integer> {
        private final Set<Object> interimUniqueValues;

        protected InternalCountOutput() {
            this(null, null, null);
        }

        public InternalCountOutput(K k, Integer count, Set<Object> interimUniqueValues) {
            super(k, count);
            this.interimUniqueValues = interimUniqueValues;
        }

        public Set<Object> getInternalSet() {
            return this.interimUniqueValues;
        }
    }
}

