/*
 * Decompiled with CFR 0.152.
 */
package datafu.pig.bags;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class CountDistinctUpTo
extends AccumulatorEvalFunc<Integer>
implements Algebraic {
    private static final TupleFactory tupleFactory = TupleFactory.getInstance();
    private static final BagFactory bagFactory = BagFactory.getInstance();
    private Set<Tuple> set;
    private final int max;

    public CountDistinctUpTo(String maxAmount) {
        this.max = Integer.valueOf(maxAmount);
        this.set = new HashSet<Tuple>(this.max);
    }

    public void accumulate(Tuple tuple) throws IOException {
        CountDistinctUpTo.count(this.set, tuple, this.max, this.log);
    }

    private static void count(Set<Tuple> set, Tuple input, int max, Log log) throws ExecException {
        if (set.size() == max) {
            return;
        }
        if (input == null) {
            return;
        }
        DataBag bag = (DataBag)input.get(0);
        if (bag == null) {
            return;
        }
        for (Tuple t : bag) {
            if (!set.add(t) || set.size() != max) continue;
            return;
        }
    }

    private static Set<Tuple> makeDistinctSet(Tuple input, int max, Log log) throws ExecException {
        HashSet<Tuple> set = new HashSet<Tuple>(max);
        DataBag bag = (DataBag)input.get(0);
        for (Tuple t : bag) {
            if (t.get(0) == null) {
                return null;
            }
            CountDistinctUpTo.count(set, t, max, log);
            if (set.size() != max) continue;
            return null;
        }
        return set;
    }

    public void cleanup() {
        this.set.clear();
    }

    public Integer getValue() {
        return this.set.size();
    }

    public String getInitial() {
        return Initial.class.getName();
    }

    public String getIntermed() {
        return Intermediate.class.getName();
    }

    public String getFinal() {
        return Final.class.getName();
    }

    public Schema outputSchema(Schema input) {
        if (input.size() != 1) {
            throw new RuntimeException("Expected a single field of type bag, but found " + input.size() + " fields");
        }
        try {
            Schema.FieldSchema field = input.getField(0);
            if (field.type != 120) {
                throw new RuntimeException("Expected a bag but got: " + DataType.findTypeName((byte)field.type));
            }
        }
        catch (FrontendException e) {
            throw new RuntimeException(e);
        }
        return new Schema(new Schema.FieldSchema("CountDistinctUpTo", 10));
    }

    public static class Final
    extends EvalFunc<Integer> {
        private final int max;

        public Final() {
            this("0");
        }

        public Final(String maxAmount) {
            this.max = Integer.valueOf(maxAmount);
        }

        public Integer exec(Tuple input) throws IOException {
            Set set = CountDistinctUpTo.makeDistinctSet(input, this.max, this.log);
            if (set == null) {
                return this.max;
            }
            return set.size();
        }
    }

    public static class Intermediate
    extends EvalFunc<Tuple> {
        private final int max;

        public Intermediate() {
            this("0");
        }

        public Intermediate(String maxAmount) {
            this.max = Integer.valueOf(maxAmount);
        }

        public Tuple exec(Tuple input) throws IOException {
            Set set = CountDistinctUpTo.makeDistinctSet(input, this.max, this.log);
            if (set == null) {
                Tuple result = tupleFactory.newTuple(1);
                result.set(0, null);
                return result;
            }
            DataBag outputBag = bagFactory.newDefaultBag();
            for (Tuple t : set) {
                outputBag.add(t);
            }
            return tupleFactory.newTuple((Object)outputBag);
        }
    }

    public static class Initial
    extends EvalFunc<Tuple> {
        public Initial() {
        }

        public Initial(String maxAmount) {
        }

        public Tuple exec(Tuple input) throws IOException {
            Tuple t;
            DataBag inputBag = (DataBag)input.get(0);
            Iterator it = inputBag.iterator();
            DataBag outputBag = bagFactory.newDefaultBag();
            if (it.hasNext() && (t = (Tuple)it.next()) != null && t.size() > 0 && t.get(0) != null) {
                outputBag.add(t);
            }
            return tupleFactory.newTuple((Object)outputBag);
        }
    }
}

