/*
 * Decompiled with CFR 0.152.
 */
package datafu.pig.sampling;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import org.apache.commons.math.MathException;
import org.apache.commons.math.random.RandomDataImpl;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class SimpleRandomSampleWithReplacementVote
extends EvalFunc<DataBag> {
    public static final String OUTPUT_BAG_NAME_PREFIX = "SRSWR_VOTE";
    public static final String CANDIDATE_FIELD_NAME = "candidate";
    public static final String POSITION_FIELD_NAME = "position";
    public static final String SCORE_FIELD_NAME = "score";
    public static final double FAILURE_RATE = 1.0E-4;
    private static final TupleFactory tupleFactory = TupleFactory.getInstance();
    private static final BagFactory bagFactory = BagFactory.getInstance();
    private RandomDataImpl _rdg = new RandomDataImpl();

    private int[] sampleWithoutReplacement(int n, int k) {
        if (k == 0) {
            return new int[0];
        }
        if ((long)k < (long)n / 3L) {
            HashSet sample = Sets.newHashSetWithExpectedSize((int)k);
            while (sample.size() < k) {
                sample.add(this._rdg.nextInt(0, n - 1));
            }
            return Ints.toArray((Collection)sample);
        }
        int[] sample = new int[k];
        int i = 0;
        for (int j = 0; j < n && i < k; ++j) {
            if (!(this._rdg.nextUniform(0.0, 1.0) < 1.0 * (double)(k - i) / (double)(n - j))) continue;
            sample[i] = j;
            ++i;
        }
        return sample;
    }

    public DataBag exec(Tuple tuple) throws IOException {
        if (tuple.size() != 3) {
            throw new IllegalArgumentException("The input arguments are: a bag of items, the desired sample size (int), and the population size (long) or a good lower bound of it");
        }
        DataBag items = (DataBag)tuple.get(0);
        int sampleSize = ((Number)tuple.get(1)).intValue();
        long count = ((Number)tuple.get(2)).longValue();
        double threshold = 1.0 - Math.exp(Math.log(1.0E-4 / (double)sampleSize) / (double)count);
        DataBag candidates = bagFactory.newDefaultBag();
        for (Tuple item : items) {
            int numOutputPositions;
            try {
                numOutputPositions = this._rdg.nextBinomial(sampleSize, threshold);
            }
            catch (MathException e) {
                throw new RuntimeException("Failed to generate a binomial value with n = " + sampleSize + " and p = " + threshold, e);
            }
            for (int outputPosition : this.sampleWithoutReplacement(sampleSize, numOutputPositions)) {
                Tuple candidate = tupleFactory.newTuple();
                candidate.append((Object)outputPosition);
                candidate.append((Object)this._rdg.nextUniform(0.0, 1.0));
                candidate.append((Object)item);
                candidates.add(candidate);
            }
        }
        return candidates;
    }

    public Schema outputSchema(Schema input) {
        try {
            Schema.FieldSchema inputFieldSchema = input.getField(0);
            if (inputFieldSchema.type != 120) {
                throw new RuntimeException("Expected a BAG as input");
            }
            ArrayList fieldSchemas = Lists.newArrayList();
            fieldSchemas.add(new Schema.FieldSchema(POSITION_FIELD_NAME, 10));
            fieldSchemas.add(new Schema.FieldSchema(SCORE_FIELD_NAME, 25));
            fieldSchemas.add(new Schema.FieldSchema(CANDIDATE_FIELD_NAME, inputFieldSchema.schema.getField((int)0).schema));
            Schema outputSchema = new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX, input), new Schema((List)fieldSchemas), 120));
            return outputSchema;
        }
        catch (FrontendException e) {
            throw new RuntimeException("Error deriving output schema.", e);
        }
    }
}

