/*
 * Decompiled with CFR 0.152.
 */
package datafu.pig.stats;

import datafu.pig.stats.QuantileUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class StreamingQuantile
extends AccumulatorEvalFunc<Tuple> {
    private final int numQuantiles;
    private final QuantileEstimator estimator;
    private List<Double> quantiles;
    private boolean ordinalOutputSchema;

    public StreamingQuantile(String ... k) {
        if (k.length == 1 && Double.parseDouble(k[0]) > 1.0) {
            this.ordinalOutputSchema = true;
            this.numQuantiles = Integer.parseInt(k[0]);
        } else {
            this.quantiles = QuantileUtil.getQuantilesFromParams(k);
            this.numQuantiles = StreamingQuantile.getNumQuantiles(this.quantiles);
        }
        this.estimator = new QuantileEstimator(this.numQuantiles);
    }

    private static int getNumQuantiles(List<Double> quantiles) {
        quantiles = new ArrayList<Double>(quantiles);
        Collections.sort(quantiles);
        int start = 0;
        int end = quantiles.size() - 1;
        while (quantiles.get(start) == 0.0) {
            ++start;
        }
        while (quantiles.get(end) == 1.0) {
            --end;
        }
        double gcd = 1.0;
        for (int i = end; i >= start; --i) {
            gcd = StreamingQuantile.gcd(gcd, quantiles.get(i));
        }
        int numQuantiles = (int)(1.0 / gcd) + 1;
        return numQuantiles;
    }

    private static double gcd(double a, double b) {
        if (StreamingQuantile.round(a) == 0.0) {
            throw new IllegalArgumentException("Quantiles are smaller than the allowed precision");
        }
        if (StreamingQuantile.round(b) == 0.0) {
            throw new IllegalArgumentException("Quantiles are smaller than the allowed precision");
        }
        while (StreamingQuantile.round(b) != 0.0) {
            double t = b;
            b = a % b;
            a = t;
        }
        return StreamingQuantile.round(a);
    }

    private static double round(double d) {
        return (double)Math.round(d * 100000.0) / 100000.0;
    }

    public void accumulate(Tuple b) throws IOException {
        DataBag bag = (DataBag)b.get(0);
        if (bag == null || bag.size() == 0L) {
            return;
        }
        for (Tuple t : bag) {
            Object o = t.get(0);
            if (!(o instanceof Number)) {
                throw new IllegalStateException("bag must have numerical values (and be non-null)");
            }
            this.estimator.add(((Number)o).doubleValue());
        }
    }

    public void cleanup() {
        this.estimator.clear();
    }

    public Tuple getValue() {
        Tuple t = TupleFactory.getInstance().newTuple(this.quantiles != null ? this.quantiles.size() : this.numQuantiles);
        try {
            if (this.quantiles == null) {
                int j = 0;
                for (double quantileValue : this.estimator.getQuantiles()) {
                    t.set(j, (Object)quantileValue);
                    ++j;
                }
            } else {
                HashMap<Double, Double> quantileValues = new HashMap<Double, Double>(this.quantiles.size());
                double quantileKey = 0.0;
                for (double quantileValue : this.estimator.getQuantiles()) {
                    quantileValues.put(StreamingQuantile.round(quantileKey), quantileValue);
                    quantileKey += 1.0 / (double)(this.numQuantiles - 1);
                }
                int j = 0;
                for (double d : this.quantiles) {
                    Double quantileValue = (Double)quantileValues.get(StreamingQuantile.round(d));
                    t.set(j, (Object)quantileValue);
                    ++j;
                }
            }
        }
        catch (IOException e) {
            return null;
        }
        return t;
    }

    public Schema outputSchema(Schema input) {
        Schema tupleSchema = new Schema();
        if (this.ordinalOutputSchema) {
            for (int i = 0; i < this.numQuantiles; ++i) {
                tupleSchema.add(new Schema.FieldSchema("quantile_" + i, 25));
            }
        } else {
            for (Double x : this.quantiles) {
                tupleSchema.add(new Schema.FieldSchema("quantile_" + x.toString().replace(".", "_"), 25));
            }
        }
        try {
            return new Schema(new Schema.FieldSchema(null, tupleSchema, 110));
        }
        catch (FrontendException e) {
            throw new RuntimeException(e);
        }
    }

    static class QuantileEstimator {
        private static final long MAX_TOT_ELEMS = 0x10000000000L;
        private final List<List<Double>> buffer = new ArrayList<List<Double>>();
        private final int numQuantiles;
        private final int maxElementsPerBuffer;
        private int totalElements;
        private double min;
        private double max;

        public QuantileEstimator(int numQuantiles) {
            this.numQuantiles = numQuantiles;
            this.maxElementsPerBuffer = this.computeMaxElementsPerBuffer();
        }

        private int computeMaxElementsPerBuffer() {
            double epsilon = 1.0 / ((double)this.numQuantiles - 1.0);
            int b = 2;
            while ((double)((long)(b - 2) * (1L << b - 2)) + 0.5 <= epsilon * 1.099511627776E12) {
                ++b;
            }
            return (int)(0x10000000000L / (1L << b - 1));
        }

        private void ensureBuffer(int level) {
            while (this.buffer.size() < level + 1) {
                this.buffer.add(null);
            }
            if (this.buffer.get(level) == null) {
                this.buffer.set(level, new ArrayList());
            }
        }

        private void collapse(List<Double> a, List<Double> b, List<Double> out) {
            int indexA = 0;
            int indexB = 0;
            int count = 0;
            Double smaller = null;
            while (indexA < this.maxElementsPerBuffer || indexB < this.maxElementsPerBuffer) {
                smaller = indexA >= this.maxElementsPerBuffer || indexB < this.maxElementsPerBuffer && a.get(indexA) >= b.get(indexB) ? b.get(indexB++) : a.get(indexA++);
                if (count++ % 2 != 0) continue;
                out.add(smaller);
            }
            a.clear();
            b.clear();
        }

        private void recursiveCollapse(List<Double> buf, int level) {
            this.ensureBuffer(level + 1);
            List<Object> merged = this.buffer.get(level + 1).isEmpty() ? this.buffer.get(level + 1) : new ArrayList(this.maxElementsPerBuffer);
            this.collapse(this.buffer.get(level), buf, merged);
            if (this.buffer.get(level + 1) != merged) {
                this.recursiveCollapse(merged, level + 1);
            }
        }

        public void add(double elem) {
            if (this.totalElements == 0 || elem < this.min) {
                this.min = elem;
            }
            if (this.totalElements == 0 || this.max < elem) {
                this.max = elem;
            }
            if (this.totalElements > 0 && this.totalElements % (2 * this.maxElementsPerBuffer) == 0) {
                Collections.sort(this.buffer.get(0));
                Collections.sort(this.buffer.get(1));
                this.recursiveCollapse(this.buffer.get(0), 1);
            }
            this.ensureBuffer(0);
            this.ensureBuffer(1);
            int index = this.buffer.get(0).size() < this.maxElementsPerBuffer ? 0 : 1;
            this.buffer.get(index).add(elem);
            ++this.totalElements;
        }

        public void clear() {
            this.buffer.clear();
            this.totalElements = 0;
        }

        public List<Double> getQuantiles() {
            ArrayList<Double> quantiles = new ArrayList<Double>();
            quantiles.add(this.min);
            if (this.buffer.get(0) != null) {
                Collections.sort(this.buffer.get(0));
            }
            if (this.buffer.get(1) != null) {
                Collections.sort(this.buffer.get(1));
            }
            int[] index = new int[this.buffer.size()];
            long S = 0L;
            for (int i = 1; i <= this.numQuantiles - 2; ++i) {
                double smallest;
                long targetS = (long)Math.ceil((double)i * ((double)this.totalElements / ((double)this.numQuantiles - 1.0)));
                while (true) {
                    long incrementS;
                    smallest = this.max;
                    int minBufferId = -1;
                    for (int j = 0; j < this.buffer.size(); ++j) {
                        if (this.buffer.get(j) == null || index[j] >= this.buffer.get(j).size() || smallest < this.buffer.get(j).get(index[j])) continue;
                        smallest = this.buffer.get(j).get(index[j]);
                        minBufferId = j;
                    }
                    long l = incrementS = minBufferId <= 1 ? 1L : 1L << minBufferId - 1;
                    if (S + incrementS >= targetS) break;
                    int n = minBufferId;
                    index[n] = index[n] + 1;
                    S += incrementS;
                }
                quantiles.add(smallest);
            }
            quantiles.add(this.max);
            return quantiles;
        }
    }
}

