/*
 * Decompiled with CFR 0.152.
 */
package datafu.pig.stats.entropy;

import datafu.pig.stats.entropy.EntropyEstimator;
import datafu.pig.stats.entropy.EntropyUtil;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class EmpiricalCountEntropy
extends AccumulatorEvalFunc<Double>
implements Algebraic {
    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
    private EntropyEstimator streamEstimator;
    private String base;
    private String param = null;

    public EmpiricalCountEntropy() throws ExecException {
        this("log");
    }

    public EmpiricalCountEntropy(String base) throws ExecException {
        try {
            this.streamEstimator = EntropyEstimator.createEstimator("empirical", base);
        }
        catch (IllegalArgumentException ex) {
            throw new ExecException(String.format("Fail to initialize EmpiricalCountEntropy with logarithm base: (%s), exception: (%s)", base, ex));
        }
        this.base = base;
    }

    private String getParam() {
        if (this.param == null) {
            this.param = this.base != null ? String.format("('%s')", this.base) : "";
        }
        return this.param;
    }

    public String getFinal() {
        return Final.class.getName() + this.getParam();
    }

    public String getInitial() {
        return Initial.class.getName() + this.getParam();
    }

    public String getIntermed() {
        return Intermediate.class.getName() + this.getParam();
    }

    protected static Tuple combine(DataBag values) throws ExecException {
        Tuple output = mTupleFactory.newTuple(2);
        boolean sawNonNull = false;
        double sumOfCxLogCx = 0.0;
        long sumOfCx = 0L;
        for (Tuple t : values) {
            Double scxlogcx = (Double)t.get(0);
            Long scx = (Long)t.get(1);
            if (scxlogcx == null || scx == null) continue;
            sumOfCxLogCx += scxlogcx.doubleValue();
            sumOfCx += scx.longValue();
            sawNonNull = true;
        }
        if (sawNonNull) {
            output.set(0, (Object)sumOfCxLogCx);
            output.set(1, (Object)sumOfCx);
        } else {
            output.set(0, null);
            output.set(1, null);
        }
        return output;
    }

    public void accumulate(Tuple input) throws IOException {
        for (Tuple t : (DataBag)input.get(0)) {
            long cx = ((Number)t.get(0)).longValue();
            this.streamEstimator.accumulate(cx);
        }
    }

    public Double getValue() {
        return this.streamEstimator.getEntropy();
    }

    public void cleanup() {
        this.streamEstimator.reset();
    }

    public Schema outputSchema(Schema input) {
        try {
            Schema.FieldSchema inputFieldSchema = input.getField(0);
            if (inputFieldSchema.type != 120) {
                throw new RuntimeException("Expected a BAG as input");
            }
            Schema inputBagSchema = inputFieldSchema.schema;
            if (inputBagSchema.getField((int)0).type != 110) {
                throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s", DataType.findTypeName((byte)inputBagSchema.getField((int)0).type)));
            }
            Schema tupleSchema = inputBagSchema.getField((int)0).schema;
            if (tupleSchema == null) {
                throw new RuntimeException("The tuple of input bag has no schema");
            }
            List fieldSchemaList = tupleSchema.getFields();
            if (fieldSchemaList == null || fieldSchemaList.size() != 1) {
                throw new RuntimeException("The field schema of the input tuple is null or its size is not 1");
            }
            if (((Schema.FieldSchema)fieldSchemaList.get((int)0)).type != 10 && ((Schema.FieldSchema)fieldSchemaList.get((int)0)).type != 15) {
                Object[] expectedTypes = new String[]{DataType.findTypeName((byte)10), DataType.findTypeName((byte)15)};
                throw new RuntimeException("Expect the type of the input tuple to be of (" + Arrays.toString(expectedTypes) + "), but instead found " + DataType.findTypeName((byte)((Schema.FieldSchema)fieldSchemaList.get((int)0)).type));
            }
            return new Schema(new Schema.FieldSchema(this.getSchemaName(((Object)((Object)this)).getClass().getName().toLowerCase(), input), 25));
        }
        catch (FrontendException e) {
            throw new RuntimeException(e);
        }
    }

    public static class Final
    extends EvalFunc<Double> {
        private String base;

        public Final() {
            this("log");
        }

        public Final(String base) {
            this.base = base;
        }

        public Double exec(Tuple input) throws IOException {
            try {
                DataBag b = (DataBag)input.get(0);
                Tuple combined = EmpiricalCountEntropy.combine(b);
                Double sumOfCxLogCx = (Double)combined.get(0);
                Long sumOfCx = (Long)combined.get(1);
                if (sumOfCxLogCx == null || sumOfCx == null) {
                    this.warn("Invalid null field output from combine(), 1st field: " + sumOfCxLogCx + ", 2nd field: " + sumOfCx, (Enum)PigWarning.UDF_WARNING_1);
                    return null;
                }
                Double entropy = null;
                double scxlogcx = sumOfCxLogCx;
                long scx = sumOfCx;
                if (scx > 0L) {
                    entropy = EntropyUtil.logTransform(Math.log(scx) - scxlogcx / (double)scx, this.base);
                }
                return entropy;
            }
            catch (ExecException ee) {
                throw ee;
            }
            catch (Exception e) {
                int errCode = 10082;
                String msg = "Error while computing average in " + ((Object)((Object)this)).getClass().getSimpleName();
                throw new ExecException(msg, errCode, 4, (Throwable)e);
            }
        }
    }

    public static class Intermediate
    extends EvalFunc<Tuple> {
        public Intermediate() {
        }

        public Intermediate(String base) {
        }

        public Tuple exec(Tuple input) throws IOException {
            try {
                DataBag b = (DataBag)input.get(0);
                return EmpiricalCountEntropy.combine(b);
            }
            catch (ExecException ee) {
                throw ee;
            }
            catch (Exception e) {
                int errCode = 10081;
                String msg = "Error while computing entropy in " + ((Object)((Object)this)).getClass().getSimpleName();
                throw new ExecException(msg, errCode, 4, (Throwable)e);
            }
        }
    }

    public static class Initial
    extends EvalFunc<Tuple> {
        public Initial() {
        }

        public Initial(String base) {
        }

        public Tuple exec(Tuple input) throws IOException {
            Tuple t = mTupleFactory.newTuple(2);
            try {
                DataBag bg = (DataBag)input.get(0);
                Long cxl = null;
                if (bg.iterator().hasNext()) {
                    Tuple tp = (Tuple)bg.iterator().next();
                    cxl = ((Number)tp.get(0)).longValue();
                }
                if (cxl == null || cxl < 0L) {
                    t.set(0, null);
                    t.set(1, null);
                    this.warn("Non-positive input frequency number: " + cxl, (Enum)PigWarning.UDF_WARNING_1);
                } else {
                    long cx = cxl;
                    double logcx = cx > 0L ? Math.log(cx) : 0.0;
                    double cxlogcx = (double)cx * logcx;
                    t.set(0, (Object)cxlogcx);
                    t.set(1, (Object)cxl);
                }
                return t;
            }
            catch (ExecException ee) {
                throw ee;
            }
            catch (Exception e) {
                int errCode = 10080;
                String msg = "Error while computing entropy in " + ((Object)((Object)this)).getClass().getSimpleName();
                throw new ExecException(msg, errCode, 4, (Throwable)e);
            }
        }
    }
}

