/*
 * Decompiled with CFR 0.152.
 */
package datafu.pig.sessions;

import java.io.IOException;
import java.util.UUID;
import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.builtin.Nondeterministic;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.joda.time.ReadableInstant;

@Nondeterministic
public class Sessionize
extends AccumulatorEvalFunc<DataBag> {
    private final long millis;
    private DataBag outputBag;
    private DateTime last_date;
    private String id;

    public Sessionize(String timeSpec) {
        Period p = new Period((Object)("PT" + timeSpec.toUpperCase()));
        this.millis = p.toStandardDuration().getMillis();
        this.cleanup();
    }

    public void accumulate(Tuple input) throws IOException {
        for (Tuple t : (DataBag)input.get(0)) {
            DateTime date;
            Object timeObj = t.get(0);
            if (timeObj instanceof String) {
                date = new DateTime((Object)((String)timeObj));
            } else if (timeObj instanceof Long) {
                date = new DateTime((Object)((Long)timeObj));
            } else {
                throw new RuntimeException("Time must either be a String or Long");
            }
            if (this.last_date == null) {
                this.last_date = date;
            } else if (date.isAfter((ReadableInstant)this.last_date.plus(this.millis))) {
                this.id = UUID.randomUUID().toString();
            } else if (date.isBefore((ReadableInstant)this.last_date)) {
                throw new IOException(String.format("input time series is not sorted (%s < %s)", date, this.last_date));
            }
            Tuple t_new = TupleFactory.getInstance().newTuple(t.getAll());
            t_new.append((Object)this.id);
            this.outputBag.add(t_new);
            this.last_date = date;
        }
    }

    public DataBag getValue() {
        return this.outputBag;
    }

    public void cleanup() {
        this.last_date = null;
        this.outputBag = BagFactory.getInstance().newDefaultBag();
        this.id = UUID.randomUUID().toString();
    }

    public Schema outputSchema(Schema input) {
        try {
            Schema.FieldSchema inputFieldSchema = input.getField(0);
            if (inputFieldSchema.type != 120) {
                throw new RuntimeException("Expected a BAG as input");
            }
            Schema inputBagSchema = inputFieldSchema.schema;
            if (inputBagSchema.getField((int)0).type != 110) {
                throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s", DataType.findTypeName((byte)inputBagSchema.getField((int)0).type)));
            }
            Schema inputTupleSchema = inputBagSchema.getField((int)0).schema;
            if (inputTupleSchema.getField((int)0).type != 55 && inputTupleSchema.getField((int)0).type != 15) {
                throw new RuntimeException(String.format("Expected first element of tuple to be a CHARARRAY or LONG, but instead found %s", DataType.findTypeName((byte)inputTupleSchema.getField((int)0).type)));
            }
            Schema outputTupleSchema = inputTupleSchema.clone();
            outputTupleSchema.add(new Schema.FieldSchema("session_id", 55));
            return new Schema(new Schema.FieldSchema(this.getSchemaName(((Object)((Object)this)).getClass().getName().toLowerCase(), input), outputTupleSchema, 120));
        }
        catch (CloneNotSupportedException e) {
            throw new RuntimeException(e);
        }
        catch (FrontendException e) {
            throw new RuntimeException(e);
        }
    }
}

