/*
 * Decompiled with CFR 0.152.
 */
package water.rapids;

import java.util.ArrayList;
import java.util.HashSet;
import water.DKV;
import water.DTask;
import water.Freezable;
import water.Futures;
import water.H2O;
import water.H2ONode;
import water.Key;
import water.Keyed;
import water.MRTask;
import water.RPC;
import water.fvec.AppendableVec;
import water.fvec.Chunk;
import water.fvec.Frame;
import water.fvec.NewChunk;
import water.fvec.SubsetVec;
import water.fvec.Vec;
import water.rapids.AST;
import water.rapids.ASTFrame;
import water.rapids.ASTGroupBy;
import water.rapids.ASTId;
import water.rapids.ASTLongList;
import water.rapids.ASTNum;
import water.rapids.ASTOp;
import water.rapids.Env;
import water.rapids.Exec;
import water.util.IcedHashMap;
import water.util.IcedInt;

public class ASTddply
extends ASTOp {
    long[] _cols;
    String _fun;
    AST[] _fun_args;
    static final String[] VARS = new String[]{"ary", "{cols}", "FUN"};

    public ASTddply() {
        super(VARS);
    }

    @Override
    String opStr() {
        return "h2o.ddply";
    }

    @Override
    ASTOp make() {
        return new ASTddply();
    }

    @Override
    ASTddply parse_impl(Exec E) {
        AST ary = E.parse();
        AST s = E.parse();
        if (s instanceof ASTLongList) {
            this._cols = ((ASTLongList)s)._l;
        } else if (s instanceof ASTNum) {
            this._cols = new long[]{(long)((ASTNum)s)._d};
        } else {
            throw new IllegalArgumentException("Columns expected to be a llist or number. Got: " + s.getClass());
        }
        this._fun = ((ASTId)E.parse())._id;
        ArrayList<AST> fun_args = new ArrayList<AST>();
        while (!E.isEnd()) {
            fun_args.add(E.parse());
        }
        this._fun_args = fun_args.size() > 0 ? fun_args.toArray(new AST[fun_args.size()]) : null;
        E.eatEnd();
        ASTddply res = (ASTddply)this.clone();
        res._asts = new AST[]{ary};
        return res;
    }

    @Override
    void apply(Env env) {
        Frame fr = env.popAry();
        for (long l : this._cols) {
            if (l <= (long)fr.numCols() && l >= 0L) continue;
            throw new IllegalArgumentException("Column " + (l + 1L) + " out of range for frame columns " + fr.numCols());
        }
        Pass1A p1a = (Pass1A)new Pass1A(this._cols).doAll(fr);
        Group[] grps = p1a._grps.keySet().toArray(new Group[p1a._grps.size()]);
        int ngrps = grps.length;
        while (grps[ngrps - 1] == null) {
            --ngrps;
        }
        Group[] groups = new Group[ngrps];
        System.arraycopy(grps, 0, groups, 0, ngrps);
        grps = groups;
        Pass2 p2 = new Pass2(fr, grps);
        H2O.submitTask(p2).join();
        Key[] groupFrames = p2._keys;
        Pass3 p3 = new Pass3(groupFrames, ASTOp.get(this._fun).make(), grps, this._fun_args);
        p3.go();
        Vec layoutVec = Vec.makeZero(p3._remoteTasks.length);
        final RemoteRapids[] results = p3._remoteTasks;
        for (int k = 0; k < p2._tasks.length; ++k) {
            for (Key key : p2._tasks[k]._subsetVecKeys) {
                Keyed.remove(key);
            }
        }
        int nonnull = -1;
        for (int i = 0; i < results.length; ++i) {
            RemoteRapids remoteRapids = results[i] = results[i]._result == null ? null : results[i];
            if (results[i] == null) continue;
            nonnull = i;
        }
        if (nonnull == -1) {
            env.pushAry(new Frame(Vec.makeCon(0.0, 0L)));
            return;
        }
        int ncols = results[nonnull]._result.length;
        String[] names = new String[ncols];
        String[][] domains = new String[ncols][];
        int i = 0;
        while (i < this._cols.length) {
            names[i] = fr.names()[(int)this._cols[i]];
            domains[i] = fr.domains()[(int)this._cols[i++]];
        }
        int j = 1;
        while (i < ncols) {
            names[i++] = "C" + j++;
        }
        Frame fr2 = ((MRTask)new MRTask(){

            @Override
            public void map(Chunk[] c, NewChunk[] nc) {
                int start = (int)c[0].start();
                for (int i = 0; i < c[0]._len; ++i) {
                    double d;
                    if (results[i + start] == null || Double.isNaN(d = results[i + start]._result[nc.length - 1])) continue;
                    for (int j = 0; j < nc.length; ++j) {
                        nc[j].addNum(results[i + start]._result[j]);
                    }
                }
            }
        }.doAll(ncols, layoutVec)).outputFrame(names, domains);
        layoutVec.remove();
        env.pushAry(fr2);
    }

    private static class RemoteRapids
    extends DTask<RemoteRapids>
    implements Freezable {
        private final Key _frameKey;
        private final ASTOp _FUN;
        private final AST[] _funArgs;
        private final double[] _ds;
        private double[] _result;
        private final byte _priority;

        RemoteRapids(Key frameKey, ASTOp FUN, AST[] args, double[] ds) {
            this._frameKey = frameKey;
            this._FUN = FUN;
            this._funArgs = args;
            this._ds = ds;
            Thread cThr = Thread.currentThread();
            this._priority = (byte)(cThr instanceof H2O.FJWThr ? ((H2O.FJWThr)cThr)._priority + 1 : super.priority());
        }

        @Override
        public byte priority() {
            return this._priority;
        }

        @Override
        public void compute2() {
            assert (this._frameKey.home());
            Env e = Env.make(new HashSet<Key>());
            Frame groupFrame = (Frame)DKV.getGet(this._frameKey);
            assert (groupFrame != null) : "Frame ID: " + this._frameKey;
            AST[] args = new AST[this._funArgs == null ? 1 : this._funArgs.length + 1];
            args[0] = new ASTFrame(groupFrame);
            if (this._funArgs != null) {
                System.arraycopy(this._funArgs, 0, args, 1, this._funArgs.length);
            }
            this._FUN.make().exec(e, args);
            if (!e.isNul()) {
                Frame fr = null;
                if (e.isAry() && (fr = e.popAry()).numRows() != 1L) {
                    throw new IllegalArgumentException("Result of ddply can only return 1 row but instead returned " + fr.numRows());
                }
                int ncols = fr == null ? 1 : fr.numCols();
                this._result = new double[this._ds.length + ncols];
                System.arraycopy(this._ds, 0, this._result, 0, this._ds.length);
                int j = this._ds.length;
                for (int i = 0; i < ncols; ++i) {
                    if (e.isStr()) {
                        this._result[j++] = e.popStr().equals("TRUE") ? 1.0 : 0.0;
                        continue;
                    }
                    if (e.isNum()) {
                        this._result[j++] = e.popDbl();
                        continue;
                    }
                    if (fr == null) continue;
                    this._result[j++] = fr.vecs()[i].at(0L);
                }
            }
            groupFrame.delete();
            this.tryComplete();
        }
    }

    private static class Pass3 {
        private final Key[] _frameKeys;
        private final ASTOp _FUN;
        private final Group[] _grps;
        private final AST[] _funArgs;
        RemoteRapids[] _remoteTasks;

        Pass3(Key[] frameKeys, ASTOp FUN, Group[] grps, AST[] args) {
            this._frameKeys = frameKeys;
            this._FUN = FUN;
            this._grps = grps;
            this._funArgs = args;
            this._remoteTasks = new RemoteRapids[this._frameKeys.length];
        }

        private void go() {
            Futures fs = new Futures();
            for (int i = 0; i < this._frameKeys.length; ++i) {
                assert (DKV.getGet(this._frameKeys[i]) != null) : "Frame #" + i + " was NULL: " + this._frameKeys[i];
                this._remoteTasks[i] = new RemoteRapids(this._frameKeys[i], this._FUN, this._funArgs, this._grps[i]._ds);
                fs.add(RPC.call(this._frameKeys[i].home_node(), this._remoteTasks[i]));
            }
            fs.blockForPending();
        }
    }

    private static class BuildGroup
    extends DTask<BuildGroup>
    implements Freezable {
        private final Key _frameKey;
        private final Key _key;
        private final long[] _rows;
        private Key[] _subsetVecKeys;
        private final byte _priority;

        BuildGroup(Key key, long[] rows, Key frameKey) {
            this._key = key;
            this._rows = rows;
            this._frameKey = frameKey;
            Thread cThr = Thread.currentThread();
            this._priority = (byte)(cThr instanceof H2O.FJWThr ? ((H2O.FJWThr)cThr)._priority + 1 : super.priority());
        }

        @Override
        public byte priority() {
            return this._priority;
        }

        @Override
        protected void compute2() {
            assert (this._key.home()) : "Key was not homed to this node!";
            Futures fs = new Futures();
            Vec layout = Vec.makeZero(this._rows.length);
            Key<Vec> key = layout.group().addVec();
            layout.remove();
            AppendableVec v = new AppendableVec((Key)key);
            NewChunk n = new NewChunk(v, 0);
            for (long l : this._rows) {
                n.addNum(l);
            }
            n.close(0, fs);
            Vec rows = v.close(fs);
            fs.blockForPending();
            Frame f = (Frame)DKV.getGet(this._frameKey);
            Vec[] data = f.vecs();
            Vec[] gvecs = new Vec[data.length];
            Key<Vec>[] keys = rows.group().addVecs(data.length);
            this._subsetVecKeys = keys;
            for (int c = 0; c < data.length; ++c) {
                gvecs[c] = new SubsetVec(keys[c], rows.get_espc(), data[c]._key, rows._key);
                gvecs[c].setDomain(data[c].domain());
                DKV.put(gvecs[c]._key, gvecs[c]);
            }
            Frame aa = new Frame(this._key, f._names, gvecs);
            DKV.put(this._key, aa);
            assert (this._key.home()) : "Key should be homed to the node! Somehow remapped during this compute2.";
            this.tryComplete();
        }
    }

    private static class Pass2Task
    extends H2O.H2OCountedCompleter<Pass2Task> {
        private final int _nodeID;
        private final Group _g;
        private final Key _frameKey;
        Key _key;
        H2ONode _n;
        Key[] _subsetVecKeys;

        Pass2Task(H2O.H2OCountedCompleter cc, int nodeID, Group g, Key frameKey, H2ONode n, Key key) {
            super(cc);
            this._nodeID = nodeID;
            this._g = g;
            this._frameKey = frameKey;
            this._n = n;
            this._key = key;
        }

        @Override
        protected void compute2() {
            H2ONode n = H2O.CLOUD.members()[this._nodeID];
            Futures fs = new Futures();
            long[] rows = new long[this._g.a.size()];
            int i = 0;
            for (IcedInt l : this._g.a.keySet()) {
                rows[i++] = l._val;
            }
            BuildGroup b = new BuildGroup(this._key, rows, this._frameKey);
            fs.add(RPC.call(n, b));
            fs.blockForPending();
            this._subsetVecKeys = b._subsetVecKeys;
            this.tryComplete();
        }
    }

    private static class Pass2
    extends H2O.H2OCountedCompleter<Pass2> {
        private final Frame _fr;
        private final Group[] _grps;
        Pass2Task[] _tasks;
        Key[] _keys;

        Pass2(Frame f, Group[] grps) {
            this._fr = f;
            this._grps = grps;
        }

        @Override
        protected void compute2() {
            this.addToPendingCount(this._grps.length - 1);
            int numnodes = H2O.CLOUD.size();
            this._tasks = new Pass2Task[this._grps.length];
            this._keys = new Key[this._grps.length];
            for (int i = 0; i < this._grps.length; ++i) {
                Key key;
                int nodeID = i % numnodes;
                H2ONode n = H2O.CLOUD.members()[nodeID];
                this._keys[i] = key = Key.make(n);
                this._tasks[i] = new Pass2Task(this, nodeID, this._grps[i], this._fr._key, n, this._keys[i]);
                this._tasks[i].fork();
            }
        }
    }

    private static class Pass1A
    extends MRTask<Pass1A> {
        private final long[] _gbCols;
        IcedHashMap<Group, String> _grps;

        Pass1A(long[] cols) {
            this._gbCols = cols;
        }

        @Override
        public void setupLocal() {
        }

        @Override
        public void map(Chunk[] c) {
            this._grps = new IcedHashMap();
            Group g = new Group(this._gbCols.length);
            int start = (int)c[0].start();
            for (int i = 0; i < c[0]._len; ++i) {
                Group gOld;
                g.fill(i, c, this._gbCols);
                String old_g = this._grps.putIfAbsent(g, "");
                if (old_g == null) {
                    gOld = g;
                    g = new Group(this._gbCols.length);
                } else {
                    gOld = this._grps.getk(g);
                }
                long cnt = gOld._N;
                while (!Group.CAS_N(gOld, cnt, cnt + 1L)) {
                    cnt = gOld._N;
                }
                gOld.a.put(new IcedInt(start + i), "");
            }
        }

        @Override
        public void reduce(Pass1A t) {
            if (this._grps != t._grps) {
                IcedHashMap<Group, String> l = this._grps;
                IcedHashMap<Group, String> r = t._grps;
                if (l.size() < r.size()) {
                    l = r;
                    r = this._grps;
                }
                for (Group rg : r.keySet()) {
                    if (!l.containsKey(rg)) continue;
                    Group lg = l.getk(rg);
                    long L = lg._N;
                    while (!Group.CAS_N(lg, L, L + rg._N)) {
                        L = lg._N;
                    }
                }
                this._grps = l;
                t._grps = null;
            }
        }
    }

    public static class Group
    extends ASTGroupBy.G {
        IcedHashMap<IcedInt, String> a;

        public Group() {
        }

        public Group(int len) {
            super(len);
            this.a = new IcedHashMap();
        }

        public Group(double[] ds) {
            super(ds);
        }
    }
}

