/*
 * Decompiled with CFR 0.152.
 */
package water.fvec;

import java.util.Arrays;
import jsr166y.CountedCompleter;
import water.Futures;
import water.H2O;
import water.Key;
import water.MRTask;
import water.fvec.Chunk;
import water.fvec.Frame;
import water.fvec.NewChunk;
import water.fvec.Vec;

public class RebalanceDataSet
extends H2O.H2OCountedCompleter {
    final Frame _in;
    final int _nchunks;
    Key _okey;
    Frame _out;
    final Key _jobKey;
    final transient Vec.VectorGroup _vg;
    transient long[] _espc;

    public RebalanceDataSet(Frame modelFrame, Frame srcFrame, Key<?> dstKey) {
        this(modelFrame.anyVec().espc(), modelFrame.anyVec().group(), srcFrame, dstKey);
    }

    public RebalanceDataSet(long[] espc, Vec.VectorGroup vg, Frame srcFrame, Key<?> dstKey) {
        super(null);
        this._in = srcFrame;
        this._jobKey = null;
        this._okey = dstKey;
        this._espc = espc;
        this._vg = vg;
        this._nchunks = espc.length - 1;
    }

    public RebalanceDataSet(Frame srcFrame, Key dstKey, int nchunks) {
        this(srcFrame, dstKey, nchunks, null, null);
    }

    public RebalanceDataSet(Frame srcFrame, Key dstKey, int nchunks, H2O.H2OCountedCompleter cmp, Key jobKey) {
        super(cmp);
        this._in = srcFrame;
        this._nchunks = nchunks;
        this._jobKey = jobKey;
        this._okey = dstKey;
        this._vg = new Vec.VectorGroup();
    }

    public Frame getResult() {
        this.join();
        return this._out;
    }

    @Override
    public void compute2() {
        long[] espc;
        if (this._espc != null) {
            espc = this._espc;
        } else {
            int rpc = (int)(this._in.numRows() / (long)this._nchunks);
            int rem = (int)(this._in.numRows() % (long)this._nchunks);
            espc = new long[this._nchunks + 1];
            Arrays.fill(espc, (long)rpc);
            int i = 0;
            while (i < rem) {
                int n = i++;
                espc[n] = espc[n] + 1L;
            }
            long sum = 0L;
            for (int i2 = 0; i2 < espc.length; ++i2) {
                long s = espc[i2];
                espc[i2] = sum;
                sum += s;
            }
            assert (espc[espc.length - 1] == this._in.numRows()) : "unexpected number of rows, expected " + this._in.numRows() + ", got " + espc[espc.length - 1];
        }
        int rowLayout = Vec.ESPC.rowLayout(this._vg._key, espc);
        Vec[] srcVecs = this._in.vecs();
        this._out = new Frame(this._okey, this._in.names(), new Vec(this._vg.addVec(), rowLayout).makeCons(srcVecs.length, 0L, this._in.domains(), this._in.types()));
        this._out.delete_and_lock(this._jobKey);
        new RebalanceTask((H2O.H2OCountedCompleter)this, srcVecs).dfork(this._out);
    }

    @Override
    public void onCompletion(CountedCompleter caller) {
        assert (this._out.numRows() == this._in.numRows());
        Vec vec = this._out.anyVec();
        assert (vec.nChunks() == this._nchunks);
        this._out.update(this._jobKey);
        this._out.unlock(this._jobKey);
    }

    @Override
    public boolean onExceptionalCompletion(Throwable t, CountedCompleter caller) {
        t.printStackTrace();
        if (this._out != null) {
            this._out.delete(this._jobKey, new Futures(), true).blockForPending();
        }
        return true;
    }

    public static Frame toSingleChunk(Frame fr) {
        Key<Frame> singleKey = Key.make();
        return RebalanceDataSet.toSingleChunk(fr, singleKey);
    }

    public static Frame toSingleChunk(Frame fr, Key<Frame> destinationKey) {
        H2O.submitTask(new RebalanceDataSet(fr, destinationKey, 1)).join();
        return destinationKey.get();
    }

    public static class RebalanceTask
    extends MRTask<RebalanceTask> {
        final Vec[] _srcVecs;

        public RebalanceTask(H2O.H2OCountedCompleter cmp, Vec ... srcVecs) {
            super(cmp);
            this._srcVecs = srcVecs;
        }

        @Override
        public boolean logVerbose() {
            return false;
        }

        private void rebalanceChunk(int i, Chunk c, NewChunk nc) {
            int x;
            Vec srcVec = this._srcVecs[i];
            int N = c._len;
            int lastId = -1;
            for (int len = 0; N > len; len += x) {
                Chunk srcRaw = srcVec.chunkForRow(c._start + (long)len);
                assert (lastId == -1 || lastId == srcRaw.cidx() - 1 || srcVec.chunk2StartElem(lastId + 1) == srcVec.chunk2StartElem(srcRaw.cidx()));
                lastId = srcRaw.cidx();
                int off = (int)(c._start + (long)len - srcRaw._start);
                assert (off >= 0 && off < srcRaw._len);
                x = Math.min(N - len, srcRaw._len - off);
                srcRaw.extractRows(nc, off, off + x);
            }
            nc.close(this._fs);
        }

        @Override
        public void map(Chunk[] chks) {
            for (int c = 0; c < chks.length; ++c) {
                this.rebalanceChunk(c, chks[c], new NewChunk(chks[c]));
            }
        }
    }
}

