/*
 * Decompiled with CFR 0.152.
 */
package water.rapids.ast.prims.mungers;

import java.util.Arrays;
import java.util.Comparator;
import water.DKV;
import water.DTask;
import water.Futures;
import water.H2O;
import water.Key;
import water.MRTask;
import water.RPC;
import water.fvec.Chunk;
import water.fvec.Frame;
import water.fvec.NewChunk;
import water.fvec.SubsetChunk;
import water.fvec.Vec;
import water.rapids.Env;
import water.rapids.Session;
import water.rapids.Val;
import water.rapids.ast.AstExec;
import water.rapids.ast.AstFrame;
import water.rapids.ast.AstFunction;
import water.rapids.ast.AstPrimitive;
import water.rapids.ast.AstRoot;
import water.rapids.ast.params.AstNumList;
import water.rapids.ast.prims.mungers.AstGroup;
import water.rapids.vals.ValFrame;
import water.util.IcedHashSet;

public class AstDdply
extends AstPrimitive {
    @Override
    public String[] args() {
        return new String[]{"ary", "groupByCols", "fun"};
    }

    @Override
    public int nargs() {
        return 4;
    }

    @Override
    public String str() {
        return "ddply";
    }

    @Override
    public ValFrame apply(Env env, Env.StackHelp stk, AstRoot[] asts) {
        Frame fr = stk.track(asts[1].exec(env)).getFrame();
        int ncols = fr.numCols();
        AstNumList groupby2 = AstGroup.check(ncols, asts[2]);
        int[] gbCols = groupby2.expand4();
        AstPrimitive fun = asts[3].exec(env).getFun();
        AstFunction scope = env._scope;
        IcedHashSet<AstGroup.G> gss = AstGroup.doGroups(fr, gbCols, AstGroup.aggNRows());
        final AstGroup.G[] grps = gss.toArray(new AstGroup.G[gss.size()]);
        final int[] ordCols = new AstNumList(0L, gbCols.length).expand4();
        Arrays.sort(grps, new Comparator<AstGroup.G>(){

            @Override
            public int compare(AstGroup.G g1, AstGroup.G g2) {
                for (int i2 : ordCols) {
                    if (Double.isNaN(g1._gs[i2]) && !Double.isNaN(g2._gs[i2])) {
                        return -1;
                    }
                    if (!Double.isNaN(g1._gs[i2]) && Double.isNaN(g2._gs[i2])) {
                        return 1;
                    }
                    if (g1._gs[i2] == g2._gs[i2]) continue;
                    return g1._gs[i2] < g2._gs[i2] ? -1 : 1;
                }
                return 0;
            }

            @Override
            public boolean equals(Object o2) {
                throw H2O.unimpl();
            }
        });
        for (int gnum = 0; gnum < grps.length; ++gnum) {
            grps[gnum]._dss[0][0] = gnum;
        }
        Vec[] vgrps = ((BuildGroup)new BuildGroup(gbCols, gss).doAll(gss.size(), (byte)3, fr)).close();
        final RemoteRapids[] remoteTasks = new RemoteRapids[gss.size()];
        Futures fs = new Futures();
        for (int i2 = 0; i2 < remoteTasks.length; ++i2) {
            remoteTasks[i2] = new RemoteRapids(fr, vgrps[i2]._key, fun, scope);
            fs.add(RPC.call(vgrps[i2]._key.home_node(), remoteTasks[i2]));
        }
        fs.blockForPending();
        final double[] res0 = remoteTasks[0]._result;
        String[] fcnames = new String[res0.length];
        for (int i3 = 0; i3 < res0.length; ++i3) {
            fcnames[i3] = "ddply_C" + (i3 + 1);
        }
        MRTask mrfill = new MRTask(){

            @Override
            public void map(Chunk[] c2, NewChunk[] ncs) {
                int start = (int)c2[0].start();
                for (int i2 = 0; i2 < c2[0]._len; ++i2) {
                    int j2;
                    AstGroup.G g2 = grps[i2 + start];
                    for (j2 = 0; j2 < g2._gs.length; ++j2) {
                        ncs[j2].addNum(g2._gs[j2]);
                    }
                    double[] res = remoteTasks[i2 + start]._result;
                    for (int a2 = 0; a2 < res0.length; ++a2) {
                        ncs[j2++].addNum(res[a2]);
                    }
                }
            }
        };
        Frame f2 = AstGroup.buildOutput(gbCols, res0.length, fr, fcnames, gss.size(), mrfill);
        return new ValFrame(f2);
    }

    private static class RemoteRapids
    extends DTask<RemoteRapids> {
        private Frame _data;
        private Key<Vec> _vKey;
        private AstRoot _fun;
        private AstFunction _scope;
        private double[] _result;

        RemoteRapids(Frame data, Key<Vec> vKey, AstRoot fun, AstFunction scope) {
            this._data = data;
            this._vKey = vKey;
            this._fun = fun;
            this._scope = scope;
        }

        @Override
        public void compute2() {
            assert (this._vKey.home());
            final Vec gvec = (Vec)DKV.getGet(this._vKey);
            assert (gvec.group().equals(this._data.anyVec().group()));
            Key<Vec>[] groupKeys = gvec.group().addVecs(this._data.numCols());
            final Vec[] groupVecs = new Vec[this._data.numCols()];
            Futures fs = new Futures();
            for (int i2 = 0; i2 < this._data.numCols(); ++i2) {
                groupVecs[i2] = new Vec(groupKeys[i2], gvec._rowLayout, gvec.domain(), gvec.get_type());
                DKV.put(groupVecs[i2], fs);
            }
            fs.blockForPending();
            new MRTask(){

                @Override
                public void setupLocal() {
                    Vec[] data_vecs = _data.vecs();
                    for (int i2 = 0; i2 < gvec.nChunks(); ++i2) {
                        if (!data_vecs[0].chunkKey(i2).home()) continue;
                        Chunk rowchk = gvec.chunkForChunkIdx(i2);
                        for (int col = 0; col < data_vecs.length; ++col) {
                            DKV.put(Vec.chunkKey(groupVecs[col]._key, i2), new SubsetChunk(data_vecs[col].chunkForChunkIdx(i2), rowchk, groupVecs[col]), this._fs);
                        }
                    }
                }
            }.doAllNodes();
            Frame groupFrame = new Frame(this._data._names, groupVecs);
            Session ses = new Session();
            Val val = ses.exec(new AstExec(new AstRoot[]{this._fun, new AstFrame(groupFrame)}), this._scope);
            val = ses.end(val);
            if (val.isFrame()) {
                Frame res = val.getFrame();
                if (res.numRows() != 1L) {
                    throw new IllegalArgumentException("ddply must return a 1-row (many column) frame, found " + res.numRows());
                }
                this._result = new double[res.numCols()];
                for (int i3 = 0; i3 < res.numCols(); ++i3) {
                    this._result[i3] = res.vec(i3).at(0L);
                }
                res.remove();
            } else if (val.isNum()) {
                this._result = new double[]{val.getNum()};
            } else if (val.isNums()) {
                this._result = val.getNums();
            } else {
                throw new IllegalArgumentException("ddply must return either a number or a frame, not a " + val);
            }
            groupFrame.delete();
            gvec.remove();
            this._data = null;
            this._vKey = null;
            this._fun = null;
            this._scope = null;
            this.tryComplete();
        }
    }

    private static class BuildGroup
    extends MRTask<BuildGroup> {
        final IcedHashSet<AstGroup.G> _gss;
        final int[] _gbCols;

        BuildGroup(int[] gbCols, IcedHashSet<AstGroup.G> gss) {
            this._gbCols = gbCols;
            this._gss = gss;
        }

        @Override
        public void map(Chunk[] cs, NewChunk[] ncs) {
            AstGroup.G gWork = new AstGroup.G(this._gbCols.length, null);
            for (int row = 0; row < cs[0]._len; ++row) {
                gWork.fill(row, cs, this._gbCols);
                int gnum = (int)this._gss.get((AstGroup.G)gWork)._dss[0][0];
                ncs[gnum].addNum(row);
            }
        }

        Vec[] close() {
            Futures fs = new Futures();
            Vec[] vgrps = new Vec[this._gss.size()];
            for (int i2 = 0; i2 < vgrps.length; ++i2) {
                vgrps[i2] = this._appendables[i2].close(this._appendables[i2].compute_rowLayout(), fs);
            }
            fs.blockForPending();
            return vgrps;
        }
    }
}

