/*
 * Decompiled with CFR 0.152.
 */
package water.rapids;

import java.io.File;
import java.util.ArrayList;
import water.DKV;
import water.Futures;
import water.H2O;
import water.H2ONode;
import water.Key;
import water.MRTask;
import water.RPC;
import water.fvec.Chunk;
import water.fvec.Frame;
import water.fvec.Vec;
import water.rapids.BinaryMerge;
import water.rapids.RadixOrder;
import water.rapids.SingleThreadRadixOrder;
import water.rapids.SplitByMSBLocal;

public class Merge {
    static void waitForSignalFromMatt() {
        System.out.println("waiting at the spot");
        File f = new File("/home/mdowle/GOFLAG");
        while (true) {
            System.out.println("Waiting for GOFLAG ...");
            if (f.exists()) break;
            try {
                Thread.sleep(1000L);
            }
            catch (Exception exception) {}
        }
        f.delete();
        System.out.println("GOFLAG seen, deleted and moved on");
    }

    static Frame merge(Frame leftFrame, Frame rightFrame, int[] leftCols, int[] rightCols, boolean allLeft, int[][] id_maps) {
        int j;
        int nextItem;
        System.out.println("\nCreating left index ...");
        long t0 = System.nanoTime();
        for (int i = 0; i < id_maps.length; ++i) {
            if (id_maps[i] == null) continue;
            assert ((double)id_maps[i].length == leftFrame.vec(leftCols[i]).max() + 1.0);
            int right_max = (int)rightFrame.vec(rightCols[i]).max();
            for (int j2 = 0; j2 < id_maps[i].length; ++j2) {
                assert (id_maps[i][j2] >= 0);
                if (id_maps[i][j2] <= right_max) continue;
                id_maps[i][j2] = -1;
            }
        }
        RadixOrder leftIndex = new RadixOrder(leftFrame, true, leftCols, id_maps);
        RadixOrder left = H2O.submitTask(leftIndex);
        left.join();
        System.out.println("***\n*** Creating left index took: " + (double)(System.nanoTime() - t0) / 1.0E9 + "\n***\n");
        System.out.println("\nCreating right index ...");
        t0 = System.nanoTime();
        RadixOrder rightIndex = new RadixOrder(rightFrame, false, rightCols, null);
        RadixOrder right = H2O.submitTask(rightIndex);
        right.join();
        System.out.println("***\n*** Creating right index took: " + (double)(System.nanoTime() - t0) / 1.0E9 + "\n***\n");
        long ansN = 0L;
        int numChunks = 0;
        System.out.print("Making BinaryMerge RPC calls ... ");
        t0 = System.nanoTime();
        ArrayList<RPC<BinaryMerge>> bmList = new ArrayList<RPC<BinaryMerge>>();
        for (int leftMSB = 0; leftMSB < 256; ++leftMSB) {
            int howManyMatch = 0;
            for (int rightMSB = 0; rightMSB < 256; ++rightMSB) {
                long tt = ((long)leftMSB << Math.max(8, leftIndex._biggestBit[0]) - 8) + leftIndex._colMin[0] - rightIndex._colMin[0];
                if (tt < 0L || (tt >>= Math.max(8, rightIndex._biggestBit[0]) - 8) != (long)rightMSB) continue;
                ++howManyMatch;
                H2ONode rightNode = SplitByMSBLocal.ownerOfMSB(rightMSB);
                RPC<BinaryMerge> bm = new RPC<BinaryMerge>(rightNode, new BinaryMerge(leftFrame, rightFrame, leftMSB, rightMSB, leftIndex._bytesUsed, rightIndex._bytesUsed, leftIndex._colMin, rightIndex._colMin, allLeft));
                bmList.add(bm);
                System.out.print(rightNode.index() + " ");
            }
            if (howManyMatch > 1) {
                throw new IllegalArgumentException("Internal not yet implemented: left MSB matches to multiple right MSB.");
            }
            if (howManyMatch != 0 || !allLeft) continue;
            RPC<BinaryMerge> bm = new RPC<BinaryMerge>(SplitByMSBLocal.ownerOfMSB(0), new BinaryMerge(leftFrame, rightFrame, leftMSB, 0, leftIndex._bytesUsed, rightIndex._bytesUsed, leftIndex._colMin, rightIndex._colMin, allLeft));
            bmList.add(bm);
        }
        System.out.println("... took: " + (double)(System.nanoTime() - t0) / 1.0E9);
        int queueSize = Math.max(H2O.CLOUD.size() * 10, 40);
        t0 = System.nanoTime();
        if (queueSize > bmList.size()) {
            System.out.println("Small number of MSB joins (" + bmList.size() + ") means we won't get full parallelization benefit");
            queueSize = bmList.size();
        }
        System.out.println("Sending " + bmList.size() + " BinaryMerge async RPC calls in a queue of " + queueSize + " ... ");
        int[] queue = new int[queueSize];
        BinaryMerge[] bmResults = new BinaryMerge[bmList.size()];
        for (nextItem = 0; nextItem < queueSize; ++nextItem) {
            queue[nextItem] = nextItem;
            ((RPC)bmList.get(nextItem)).call();
        }
        int leftOnQueue = queueSize;
        int waitMS = 50;
        while (leftOnQueue > 0) {
            try {
                Thread.sleep(waitMS);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
            int doneInSweep = 0;
            for (int q = 0; q < queueSize; ++q) {
                BinaryMerge thisbm;
                int thisBM = queue[q];
                if (thisBM < 0 || !((RPC)bmList.get(thisBM)).isDone()) continue;
                bmResults[thisBM] = thisbm = (BinaryMerge)((RPC)bmList.get(thisBM)).get();
                --leftOnQueue;
                ++doneInSweep;
                if (thisbm._numRowsInResult > 0L) {
                    System.out.print(String.format("%3d", queue[q]) + ":");
                    for (int t = 0; t < 20; ++t) {
                        System.out.print(String.format("%.2f ", thisbm._timings[t]));
                    }
                    System.out.println();
                    numChunks += thisbm._chunkSizes.length;
                    ansN += thisbm._numRowsInResult;
                }
                queue[q] = -1;
                if (nextItem >= bmList.size()) continue;
                ((RPC)bmList.get(nextItem)).call();
                queue[q] = nextItem++;
                ++leftOnQueue;
            }
            if (doneInSweep != 0) continue;
            waitMS = Math.min(1000, waitMS * 2);
        }
        System.out.println("took: " + (double)(System.nanoTime() - t0) / 1.0E9);
        System.out.print("Removing DKV keys of left and right index.  ... ");
        t0 = System.nanoTime();
        for (int msb = 0; msb < 256; ++msb) {
            for (int isLeft = 0; isLeft < 2; ++isLeft) {
                Key k = SingleThreadRadixOrder.getSortedOXHeaderKey(isLeft != 0, msb);
                SingleThreadRadixOrder.OXHeader oxheader = (SingleThreadRadixOrder.OXHeader)DKV.getGet(k);
                DKV.remove(k);
                if (oxheader == null) continue;
                for (int b = 0; b < oxheader._nBatch; ++b) {
                    k = SplitByMSBLocal.getSortedOXbatchKey(isLeft != 0, msb, b);
                    DKV.remove(k);
                }
            }
        }
        System.out.println("took: " + (double)(System.nanoTime() - t0) / 1.0E9);
        System.out.print("Allocating and populating chunk info (e.g. size and batch number) ...");
        t0 = System.nanoTime();
        long[] chunkSizes = new long[numChunks];
        int[] chunkLeftMSB = new int[numChunks];
        int[] chunkRightMSB = new int[numChunks];
        int[] chunkBatch = new int[numChunks];
        int k = 0;
        for (int i = 0; i < bmList.size(); ++i) {
            BinaryMerge thisbm = bmResults[i];
            if (thisbm._numRowsInResult == 0L) continue;
            int[] thisChunkSizes = thisbm._chunkSizes;
            int j3 = 0;
            while (j3 < thisChunkSizes.length) {
                chunkSizes[k] = thisChunkSizes[j3];
                chunkLeftMSB[k] = thisbm._leftMSB;
                chunkRightMSB[k] = thisbm._rightMSB;
                chunkBatch[k] = j3++;
                ++k;
            }
        }
        System.out.println("took: " + (double)(System.nanoTime() - t0) / 1.0E9);
        System.out.print("Allocating and populated espc ...");
        t0 = System.nanoTime();
        long[] espc = new long[chunkSizes.length + 1];
        int i = 0;
        long sum = 0L;
        for (long s : chunkSizes) {
            espc[i++] = sum;
            sum += s;
        }
        espc[espc.length - 1] = sum;
        System.out.println("took: " + (double)(System.nanoTime() - t0) / 1.0E9);
        assert (sum == ansN);
        System.out.print("Allocating dummy vecs/chunks of the final frame ...");
        t0 = System.nanoTime();
        int numJoinCols = leftIndex._bytesUsed.length;
        int numLeftCols = leftFrame.numCols();
        int numColsInResult = numLeftCols + rightFrame.numCols() - numJoinCols;
        byte[] types = new byte[numColsInResult];
        String[][] doms = new String[numColsInResult][];
        String[] names = new String[numColsInResult];
        for (j = 0; j < numLeftCols; ++j) {
            types[j] = leftFrame.vec(j).get_type();
            doms[j] = leftFrame.domains()[j];
            names[j] = leftFrame.names()[j];
        }
        for (j = 0; j < rightFrame.numCols() - numJoinCols; ++j) {
            types[numLeftCols + j] = rightFrame.vec(j + numJoinCols).get_type();
            doms[numLeftCols + j] = rightFrame.domains()[j + numJoinCols];
            names[numLeftCols + j] = rightFrame.names()[j + numJoinCols];
        }
        Key<Vec> key = Vec.newKey();
        Vec[] vecs = new Vec(key, Vec.ESPC.rowLayout(key, espc)).makeCons(numColsInResult, 0L, doms, types);
        System.out.println("took: " + (double)(System.nanoTime() - t0) / 1.0E9);
        System.out.print("Finally stitch together by overwriting dummies ...");
        t0 = System.nanoTime();
        Frame fr = new Frame(names, vecs);
        ChunkStitcher ff = new ChunkStitcher(chunkSizes, chunkLeftMSB, chunkRightMSB, chunkBatch);
        ff.doAll(fr);
        System.out.println("took: " + (double)(System.nanoTime() - t0) / 1.0E9);
        return fr;
    }

    static class ChunkStitcher
    extends MRTask<ChunkStitcher> {
        final long[] _chunkSizes;
        final int[] _chunkLeftMSB;
        final int[] _chunkRightMSB;
        final int[] _chunkBatch;

        public ChunkStitcher(long[] chunkSizes, int[] chunkLeftMSB, int[] chunkRightMSB, int[] chunkBatch) {
            this._chunkSizes = chunkSizes;
            this._chunkLeftMSB = chunkLeftMSB;
            this._chunkRightMSB = chunkRightMSB;
            this._chunkBatch = chunkBatch;
        }

        @Override
        public void map(Chunk[] cs) {
            int chkIdx = cs[0].cidx();
            Futures fs = new Futures();
            for (int i = 0; i < cs.length; ++i) {
                Key destKey = cs[i].vec().chunkKey(chkIdx);
                assert ((long)cs[i].len() == this._chunkSizes[chkIdx]);
                Key k = BinaryMerge.getKeyForMSBComboPerCol(this._chunkLeftMSB[chkIdx], this._chunkRightMSB[chkIdx], i, this._chunkBatch[chkIdx]);
                Chunk ck = (Chunk)DKV.getGet(k);
                DKV.put(destKey, ck, fs, true);
                DKV.remove(k);
            }
            fs.blockForPending();
        }
    }
}

