/*
 * Decompiled with CFR 0.152.
 */
package water.fvec;

import java.util.Arrays;
import java.util.Iterator;
import jsr166y.CountedCompleter;
import water.Futures;
import water.H2O;
import water.Key;
import water.MRTask;
import water.fvec.Chunk;
import water.fvec.Frame;
import water.fvec.NewChunk;
import water.fvec.Vec;

public class RebalanceDataSet
extends H2O.H2OCountedCompleter {
    final Frame _in;
    final int _nchunks;
    Key _okey;
    Frame _out;
    final Key _jobKey;
    final transient Vec.VectorGroup _vg;
    transient long[] _espc;

    public RebalanceDataSet(Frame modelFrame, Frame srcFrame, Key dstKey) {
        this(modelFrame, srcFrame, dstKey, null, null);
    }

    public RebalanceDataSet(Frame modelFrame, Frame srcFrame, Key dstKey, H2O.H2OCountedCompleter cmp, Key jobKey) {
        super(cmp);
        this._in = srcFrame;
        this._jobKey = jobKey;
        this._okey = dstKey;
        this._espc = modelFrame.anyVec()._espc;
        this._vg = modelFrame.anyVec().group();
        this._nchunks = modelFrame.anyVec().nChunks();
    }

    public RebalanceDataSet(Frame srcFrame, Key dstKey, int nchunks) {
        this(srcFrame, dstKey, nchunks, null, null);
    }

    public RebalanceDataSet(Frame srcFrame, Key dstKey, int nchunks, H2O.H2OCountedCompleter cmp, Key jobKey) {
        super(cmp);
        this._in = srcFrame;
        this._nchunks = nchunks;
        this._jobKey = jobKey;
        this._okey = dstKey;
        this._vg = new Vec.VectorGroup();
    }

    public Frame getResult() {
        this.join();
        return this._out;
    }

    @Override
    public void compute2() {
        long[] espc;
        if (this._espc != null) {
            espc = this._espc;
        } else {
            int rpc = (int)(this._in.numRows() / (long)this._nchunks);
            int rem = (int)(this._in.numRows() % (long)this._nchunks);
            espc = new long[this._nchunks + 1];
            Arrays.fill(espc, (long)rpc);
            int i = 0;
            while (i < rem) {
                int n = i++;
                espc[n] = espc[n] + 1L;
            }
            long sum = 0L;
            for (int i2 = 0; i2 < espc.length; ++i2) {
                long s = espc[i2];
                espc[i2] = sum;
                sum += s;
            }
            assert (espc[espc.length - 1] == this._in.numRows()) : "unexpected number of rows, expected " + this._in.numRows() + ", got " + espc[espc.length - 1];
        }
        Vec[] srcVecs = this._in.vecs();
        this._out = new Frame(this._okey, this._in.names(), new Vec(this._vg.addVec(), espc).makeCons(srcVecs.length, 0L, this._in.domains(), this._in.types()));
        this._out.delete_and_lock(this._jobKey);
        new RebalanceTask(this, srcVecs).asyncExec(this._out);
    }

    @Override
    public void onCompletion(CountedCompleter caller) {
        assert (this._out.numRows() == this._in.numRows());
        Vec vec = this._out.anyVec();
        assert (vec.nChunks() == this._nchunks);
        this._out.update(this._jobKey);
        this._out.unlock(this._jobKey);
    }

    @Override
    public boolean onExceptionalCompletion(Throwable t, CountedCompleter caller) {
        t.printStackTrace();
        if (this._out != null) {
            this._out.delete(this._jobKey, new Futures()).blockForPending();
        }
        return true;
    }

    public static class RebalanceTask
    extends MRTask<RebalanceTask> {
        final Vec[] _srcVecs;

        public RebalanceTask(H2O.H2OCountedCompleter cmp, Vec ... srcVecs) {
            super(cmp);
            this._srcVecs = srcVecs;
        }

        @Override
        public boolean logVerbose() {
            return false;
        }

        private void rebalanceChunk(Vec srcVec, Chunk chk) {
            int rem;
            int trailingZeros;
            NewChunk dst = new NewChunk(chk);
            dst.set_len(0);
            dst.set_sparseLen(dst._len);
            for (rem = chk._len; rem > 0 && dst._len < chk._len; rem -= trailingZeros) {
                Chunk srcRaw = srcVec.chunkForRow(chk._start + (long)dst._len);
                NewChunk src = new NewChunk(srcRaw);
                src = srcRaw.inflate_impl(src);
                assert (src._len == srcRaw._len);
                int srcFrom = (int)(chk._start + (long)dst._len - src._start);
                if (src.sparse() && dst.sparse() || (src.sparseLen() + dst.sparseLen()) * 32 < src._len + dst._len) {
                    src.set_sparse(src.sparseLen());
                    dst.set_sparse(dst.sparseLen());
                }
                int srcTo = srcFrom + rem;
                int off = srcFrom - 1;
                Iterator<NewChunk.Value> it = src.values(Math.max(0, srcFrom), srcTo);
                while (it.hasNext()) {
                    NewChunk.Value v = it.next();
                    int rid = v.rowId0();
                    assert (rid < srcTo);
                    int add = rid - off;
                    off = rid;
                    dst.addZeros(add - 1);
                    v.add2Chunk(dst);
                    assert ((rem -= add) >= 0);
                }
                trailingZeros = Math.min(rem, src._len - off - 1);
                dst.addZeros(trailingZeros);
            }
            assert (rem == 0) : "rem = " + rem;
            assert (dst._len == chk._len) : "len = " + dst._len + ", _len = " + chk._len;
            dst.close(dst.cidx(), this._fs);
        }

        @Override
        public void map(Chunk[] chks) {
            for (int i = 0; i < chks.length; ++i) {
                this.rebalanceChunk(this._srcVecs[i], chks[i]);
            }
        }
    }
}

