/*
 * Decompiled with CFR 0.152.
 */
package water.rapids;

import java.util.Arrays;
import java.util.Comparator;
import water.DKV;
import water.DTask;
import water.Futures;
import water.H2O;
import water.Key;
import water.MRTask;
import water.RPC;
import water.fvec.Chunk;
import water.fvec.Frame;
import water.fvec.NewChunk;
import water.fvec.SubsetChunk;
import water.fvec.Vec;
import water.rapids.AST;
import water.rapids.ASTExec;
import water.rapids.ASTFrame;
import water.rapids.ASTFun;
import water.rapids.ASTGroup;
import water.rapids.ASTNumList;
import water.rapids.ASTPrim;
import water.rapids.Env;
import water.rapids.Session;
import water.rapids.Val;
import water.rapids.ValFrame;
import water.util.IcedHashMap;

class ASTDdply
extends ASTPrim {
    ASTDdply() {
    }

    @Override
    public String[] args() {
        return new String[]{"ary", "groupByCols", "fun"};
    }

    @Override
    int nargs() {
        return 4;
    }

    @Override
    public String str() {
        return "ddply";
    }

    @Override
    public Val apply(Env env, Env.StackHelp stk, AST[] asts) {
        Frame fr = stk.track(asts[1].exec(env)).getFrame();
        int ncols = fr.numCols();
        ASTNumList groupby = ASTGroup.check(ncols, asts[2]);
        int[] gbCols = groupby.expand4();
        AST fun = asts[3].exec(env).getFun();
        ASTFun scope = env._scope;
        IcedHashMap<ASTGroup.G, String> gss = ASTGroup.doGroups(fr, gbCols, ASTGroup.aggNRows());
        final ASTGroup.G[] grps = gss.keySet().toArray(new ASTGroup.G[gss.size()]);
        final int[] ordCols = new ASTNumList(0L, gbCols.length).expand4();
        Arrays.sort(grps, new Comparator<ASTGroup.G>(){

            @Override
            public int compare(ASTGroup.G g1, ASTGroup.G g2) {
                for (int i : ordCols) {
                    if (Double.isNaN(g1._gs[i]) && !Double.isNaN(g2._gs[i])) {
                        return -1;
                    }
                    if (!Double.isNaN(g1._gs[i]) && Double.isNaN(g2._gs[i])) {
                        return 1;
                    }
                    if (g1._gs[i] == g2._gs[i]) continue;
                    return g1._gs[i] < g2._gs[i] ? -1 : 1;
                }
                return 0;
            }

            @Override
            public boolean equals(Object o) {
                throw H2O.unimpl();
            }
        });
        for (int gnum = 0; gnum < grps.length; ++gnum) {
            grps[gnum]._dss[0][0] = gnum;
        }
        Vec[] vgrps = ((BuildGroup)new BuildGroup(gbCols, gss).doAll(gss.size(), (byte)3, fr)).close();
        final RemoteRapids[] remoteTasks = new RemoteRapids[gss.size()];
        Futures fs = new Futures();
        for (int i = 0; i < remoteTasks.length; ++i) {
            remoteTasks[i] = new RemoteRapids(fr, vgrps[i]._key, fun, scope);
            fs.add(RPC.call(vgrps[i]._key.home_node(), remoteTasks[i]));
        }
        fs.blockForPending();
        final double[] res0 = remoteTasks[0]._result;
        String[] fcnames = new String[res0.length];
        for (int i = 0; i < res0.length; ++i) {
            fcnames[i] = "ddply_C" + (i + 1);
        }
        MRTask mrfill = new MRTask(){

            @Override
            public void map(Chunk[] c, NewChunk[] ncs) {
                int start = (int)c[0].start();
                for (int i = 0; i < c[0]._len; ++i) {
                    int j;
                    ASTGroup.G g = grps[i + start];
                    for (j = 0; j < g._gs.length; ++j) {
                        ncs[j].addNum(g._gs[j]);
                    }
                    double[] res = remoteTasks[i + start]._result;
                    for (int a = 0; a < res0.length; ++a) {
                        ncs[j++].addNum(res[a]);
                    }
                }
            }
        };
        Frame f = ASTGroup.buildOutput(gbCols, res0.length, fr, fcnames, gss.size(), mrfill);
        return new ValFrame(f);
    }

    private static class RemoteRapids
    extends DTask<RemoteRapids> {
        private Frame _data;
        private Key<Vec> _vKey;
        private AST _fun;
        private ASTFun _scope;
        private double[] _result;

        RemoteRapids(Frame data, Key<Vec> vKey, AST fun, ASTFun scope) {
            this._data = data;
            this._vKey = vKey;
            this._fun = fun;
            this._scope = scope;
        }

        @Override
        public void compute2() {
            assert (this._vKey.home());
            final Vec gvec = (Vec)DKV.getGet(this._vKey);
            assert (gvec.group().equals(this._data.anyVec().group()));
            Key<Vec>[] groupKeys = gvec.group().addVecs(this._data.numCols());
            final Vec[] groupVecs = new Vec[this._data.numCols()];
            Futures fs = new Futures();
            for (int i = 0; i < this._data.numCols(); ++i) {
                groupVecs[i] = new Vec(groupKeys[i], gvec._rowLayout, gvec.domain(), gvec.get_type());
                DKV.put(groupVecs[i], fs);
            }
            fs.blockForPending();
            new MRTask(){

                @Override
                public void setupLocal() {
                    Vec[] data_vecs = RemoteRapids.this._data.vecs();
                    for (int i = 0; i < gvec.nChunks(); ++i) {
                        if (!data_vecs[0].chunkKey(i).home()) continue;
                        Chunk rowchk = gvec.chunkForChunkIdx(i);
                        for (int col = 0; col < data_vecs.length; ++col) {
                            DKV.put(Vec.chunkKey(groupVecs[col]._key, i), new SubsetChunk(data_vecs[col].chunkForChunkIdx(i), rowchk, groupVecs[col]), this._fs);
                        }
                    }
                }
            }.doAllNodes();
            Frame groupFrame = new Frame(this._data._names, groupVecs);
            Session ses = new Session();
            Val val = ses.exec(new ASTExec(new AST[]{this._fun, new ASTFrame(groupFrame)}), this._scope);
            val = ses.end(val);
            if (val.isFrame()) {
                Frame res = val.getFrame();
                if (res.numRows() != 1L) {
                    throw new IllegalArgumentException("ddply must return a 1-row (many column) frame, found " + res.numRows());
                }
                this._result = new double[res.numCols()];
                for (int i = 0; i < res.numCols(); ++i) {
                    this._result[i] = res.vec(i).at(0L);
                }
            } else if (val.isNum()) {
                this._result = new double[]{val.getNum()};
            } else if (val.isNums()) {
                this._result = val.getNums();
            } else {
                throw new IllegalArgumentException("ddply must return either a number or a frame, not a " + val);
            }
            groupFrame.delete();
            gvec.remove();
            this._data = null;
            this._vKey = null;
            this._fun = null;
            this._scope = null;
            this.tryComplete();
        }
    }

    private static class BuildGroup
    extends MRTask<BuildGroup> {
        final IcedHashMap<ASTGroup.G, String> _gss;
        final int[] _gbCols;

        BuildGroup(int[] gbCols, IcedHashMap<ASTGroup.G, String> gss) {
            this._gbCols = gbCols;
            this._gss = gss;
        }

        @Override
        public void map(Chunk[] cs, NewChunk[] ncs) {
            ASTGroup.G gWork = new ASTGroup.G(this._gbCols.length, null);
            for (int row = 0; row < cs[0]._len; ++row) {
                gWork.fill(row, cs, this._gbCols);
                int gnum = (int)this._gss.getk((ASTGroup.G)gWork)._dss[0][0];
                ncs[gnum].addNum(row);
            }
        }

        Vec[] close() {
            Futures fs = new Futures();
            Vec[] vgrps = new Vec[this._gss.size()];
            for (int i = 0; i < vgrps.length; ++i) {
                vgrps[i] = this._appendables[i].close(this._appendables[i].compute_rowLayout(), fs);
            }
            fs.blockForPending();
            return vgrps;
        }
    }
}

