/*
 * Decompiled with CFR 0.152.
 */
package water.parser;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import jsr166y.CountedCompleter;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveAction;
import water.DKV;
import water.DTask;
import water.Freezable;
import water.Futures;
import water.H2O;
import water.H2ONode;
import water.Iced;
import water.Job;
import water.Key;
import water.Keyed;
import water.Lockable;
import water.MRTask;
import water.MemoryManager;
import water.RPC;
import water.Value;
import water.exceptions.H2OIllegalArgumentException;
import water.exceptions.H2OParseException;
import water.fvec.AppendableVec;
import water.fvec.ByteVec;
import water.fvec.C0LChunk;
import water.fvec.CStrChunk;
import water.fvec.Chunk;
import water.fvec.FileVec;
import water.fvec.Frame;
import water.fvec.Vec;
import water.nbhm.NonBlockingHashMap;
import water.nbhm.NonBlockingSetInt;
import water.parser.Categorical;
import water.parser.CsvParser;
import water.parser.FVecParseReader;
import water.parser.FVecParseWriter;
import water.parser.ParseSetup;
import water.parser.Parser;
import water.parser.ParserType;
import water.parser.SVMLightFVecParseWriter;
import water.parser.SVMLightParser;
import water.parser.ValueString;
import water.parser.ZipUtil;
import water.util.ArrayUtils;
import water.util.FrameUtils;
import water.util.GAUtils;
import water.util.Log;
import water.util.PrettyPrint;
import water.util.UnsafeUtils;

public final class ParseDataset
extends Job<Frame> {
    private MultiFileParseTask _mfpt;

    public static Frame parse(Key okey, Key ... keys) {
        return ParseDataset.parse(okey, keys, true, false, 0);
    }

    public static Frame parse(Key okey, Key[] keys, boolean deleteOnDone, boolean singleQuote, int checkHeader) {
        return ParseDataset.parse(okey, keys, deleteOnDone, ParseSetup.guessSetup(keys, singleQuote, checkHeader));
    }

    public static Frame parse(Key okey, Key[] keys, boolean deleteOnDone, ParseSetup globalSetup) {
        return (Frame)ParseDataset.parse(okey, keys, deleteOnDone, globalSetup, true).get();
    }

    public static ParseDataset parse(Key okey, Key[] keys, boolean deleteOnDone, ParseSetup globalSetup, boolean blocking) {
        ParseDataset job = ParseDataset.forkParseDataset(okey, keys, globalSetup, deleteOnDone);
        if (blocking) {
            job.get();
        }
        return job;
    }

    static ByteVec getByteVec(Key key) {
        Object ice = DKV.getGet(key);
        if (ice == null) {
            throw new H2OIllegalArgumentException("Missing data", "Did not find any data under key " + key);
        }
        return (ByteVec)(ice instanceof ByteVec ? ice : ((Frame)ice).vecs()[0]);
    }

    static String[] getColumnNames(int ncols, String[] colNames) {
        if (colNames == null) {
            colNames = new String[ncols];
            for (int i = 0; i < ncols; ++i) {
                colNames[i] = "C" + String.valueOf(i + 1);
            }
        } else {
            HashSet<String> nameSet = new HashSet<String>(Arrays.asList(colNames));
            colNames = Arrays.copyOf(colNames, ncols);
            for (int i = 0; i < ncols; ++i) {
                if (colNames[i] != null && !colNames[i].equals("")) continue;
                String tmp = "C" + String.valueOf(i + 1);
                while (nameSet.contains(tmp)) {
                    tmp = tmp + tmp;
                }
                colNames[i] = tmp;
            }
        }
        return colNames;
    }

    public static ParseDataset forkParseDataset(Key dest, Key[] keys, ParseSetup setup, boolean deleteOnDone) {
        int i;
        HashSet<String> conflictingNames = setup.checkDupColumnNames();
        for (String x : conflictingNames) {
            if (x.equals("")) continue;
            throw new IllegalArgumentException("Found duplicate column name " + x);
        }
        long totalParseSize = 0L;
        for (i = 0; i < keys.length; ++i) {
            Key k = keys[i];
            if (dest.equals(k)) {
                throw new IllegalArgumentException("Destination key " + dest + " must be different from all sources");
            }
            if (deleteOnDone) {
                for (int j = i + 1; j < keys.length; ++j) {
                    if (k != keys[j]) continue;
                    throw new IllegalArgumentException("Source key " + k + " appears twice, deleteOnDone must be false");
                }
            }
            totalParseSize += ParseDataset.getByteVec(k).length();
        }
        Log.info("Total file size: " + PrettyPrint.bytes(totalParseSize));
        for (i = 0; i < keys.length; ++i) {
            Object ice = DKV.getGet(keys[i]);
            if (ice instanceof FileVec) {
                ((FileVec)ice).setChunkSize(setup._chunk_size);
                Log.info("Parse chunk size " + setup._chunk_size);
                continue;
            }
            if (!(ice instanceof Frame) || !(((Frame)ice).vec(0) instanceof FileVec)) continue;
            ((FileVec)((Frame)ice).vec(0)).setChunkSize((Frame)ice, setup._chunk_size);
            Log.info("Parse chunk size " + setup._chunk_size);
        }
        long memsz = H2O.CLOUD.memsz();
        if (totalParseSize > memsz * 4L) {
            throw new IllegalArgumentException("Total input file size of " + PrettyPrint.bytes(totalParseSize) + " is much larger than total cluster memory of " + PrettyPrint.bytes(memsz) + ", please use either a larger cluster or smaller data.");
        }
        if (H2O.GA != null) {
            GAUtils.logParse(totalParseSize, keys.length, setup._number_columns);
        }
        ParseDataset job = new ParseDataset(dest);
        new Frame(job.dest(), new String[0], new Vec[0]).delete_and_lock(job._key);
        for (Key k : keys) {
            Lockable.read_lock(k, job._key);
        }
        ParserFJTask fjt = new ParserFJTask(job, keys, setup, deleteOnDone);
        job.start(fjt, totalParseSize, true);
        return job;
    }

    private ParseDataset(Key dest) {
        super(dest, "Parse");
    }

    private static void parseAllKeys(ParseDataset job, Key[] fkeys, ParseSetup setup, boolean deleteOnDone) {
        assert (setup._number_columns > 0);
        if (setup._column_names != null && (setup._column_names.length == 0 || setup._column_names.length == 1 && setup._column_names[0].isEmpty())) {
            setup._column_names = null;
        }
        if (setup._na_strings != null && setup._na_strings.length != setup._number_columns) {
            setup._na_strings = null;
        }
        if (fkeys.length == 0) {
            job.cancel();
            return;
        }
        job.update(0L, "Ingesting files.");
        Vec.VectorGroup vg = ParseDataset.getByteVec(fkeys[0]).group();
        MultiFileParseTask mfpt = job._mfpt = new MultiFileParseTask(vg, setup, job._key, fkeys, deleteOnDone);
        mfpt.doAll(fkeys);
        Log.trace("Done ingesting files.");
        if (job.isCancelledOrCrashed()) {
            return;
        }
        AppendableVec[] avs = mfpt.vecs();
        setup._column_names = ParseDataset.getColumnNames(avs.length, setup._column_names);
        Frame fr = null;
        int n = 0;
        int[] ecols2 = new int[avs.length];
        for (int i = 0; i < ecols2.length; ++i) {
            if (!avs[i].shouldBeEnum()) continue;
            ecols2[n++] = i;
        }
        int[] ecols = Arrays.copyOf(ecols2, n);
        if (n > 0) {
            job.update(0L, "Collecting categorical domains across nodes.");
            GatherCategoricalDomainsTask gcdt = (GatherCategoricalDomainsTask)new GatherCategoricalDomainsTask(mfpt._eKey, ecols).doAllNodes();
            ArrayList<String> offendingColNames = new ArrayList<String>();
            for (int i = 0; i < ecols.length; ++i) {
                if (gcdt.getDomainLength(i) < 10000000) {
                    avs[ecols[i]].setDomain(gcdt.getDomain(i));
                    continue;
                }
                offendingColNames.add(setup._column_names[ecols[i]]);
            }
            if (offendingColNames.size() > 0) {
                throw new H2OParseException("Exceeded categorical limit on columns " + offendingColNames + ".   Consider reparsing these columns as a string.");
            }
            job.update(0L, "Compressing data.");
            fr = new Frame(job.dest(), setup._column_names, AppendableVec.closeAll(avs));
            fr.update(job._key);
            Vec[] evecs = new Vec[ecols.length];
            for (int i = 0; i < evecs.length; ++i) {
                evecs[i] = fr.vecs()[ecols[i]];
            }
            job.update(0L, "Unifying categoricals across nodes.");
            CreateParse2GlobalCategoricalMaps[] fcdt = new CreateParse2GlobalCategoricalMaps[H2O.CLOUD.size()];
            RPC[] rpcs = new RPC[H2O.CLOUD.size()];
            for (int i = 0; i < fcdt.length; ++i) {
                H2ONode[] nodes = H2O.CLOUD.members();
                fcdt[i] = new CreateParse2GlobalCategoricalMaps(mfpt._eKey, fr._key, ecols.length);
                rpcs[i] = new RPC<CreateParse2GlobalCategoricalMaps>(nodes[i], fcdt[i]).call();
            }
            for (RPC rpc : rpcs) {
                rpc.get();
            }
            new UpdateCategoricalChunksTask(mfpt._eKey, mfpt._chunk2ParseNodeMap).doAll(evecs);
            MultiFileParseTask._enums.remove(mfpt._eKey);
            Log.trace("Done unifying categoricals across nodes.");
        } else {
            job.update(0L, "Compressing data.");
            fr = new Frame(job.dest(), setup._column_names, AppendableVec.closeAll(avs));
            Log.trace("Done closing all Vecs.");
        }
        if (job.isCancelledOrCrashed()) {
            return;
        }
        if (setup._parse_type == ParserType.SVMLight) {
            new SVFTask(fr).doAllNodes();
        }
        if (job.isCancelledOrCrashed()) {
            return;
        }
        if (mfpt._errors != null) {
            for (String err : mfpt._errors) {
                Log.warn(err);
            }
        }
        job.update(0L, "Calculating data summary.");
        ParseDataset.logParseResults(job, fr);
        fr.update(job._key);
        Frame fr2 = (Frame)DKV.getGet(fr._key);
        assert (fr2._names.length == fr2.numCols());
        fr.unlock(job._key);
        if (deleteOnDone) {
            for (Key k : fkeys) {
                assert (DKV.get(k) == null) : "Input key " + k + " not deleted during parse";
            }
        }
    }

    private static void logParseResults(ParseDataset job, Frame fr) {
        Vec[] vecArr;
        long numRows = fr.anyVec().length();
        Log.info("Parse result for " + job.dest() + " (" + Long.toString(numRows) + " rows):");
        Futures fs = new Futures();
        for (Vec v : vecArr = fr.vecs()) {
            v.startRollupStats(fs);
        }
        fs.blockForPending();
        int namelen = 0;
        for (String s : fr.names()) {
            namelen = Math.max(namelen, s.length());
        }
        String format = " %" + namelen + "s %7s %12.12s %12.12s %11s %8s %6s";
        Log.info(String.format(format, "ColV2", "type", "min", "max", "NAs", "constant", "cardinality"));
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < vecArr.length; ++i) {
            boolean printColumnToStdout;
            String maxStr;
            String minStr;
            String typeStr;
            Vec v = vecArr[i];
            boolean isCategorical = v.isEnum();
            boolean isConstant = v.isConst();
            String CStr = String.format("%" + namelen + "s:", fr.names()[i]);
            switch (v.get_type()) {
                case 0: {
                    typeStr = "all_NA";
                    minStr = "";
                    maxStr = "";
                    break;
                }
                case 1: {
                    typeStr = "UUID";
                    minStr = "";
                    maxStr = "";
                    break;
                }
                case 2: {
                    typeStr = "string";
                    minStr = "";
                    maxStr = "";
                    break;
                }
                case 3: {
                    typeStr = "numeric";
                    minStr = String.format("%g", v.min());
                    maxStr = String.format("%g", v.max());
                    break;
                }
                case 4: {
                    typeStr = "factor";
                    minStr = v.factor(0L);
                    maxStr = v.factor(v.cardinality() - 1);
                    break;
                }
                case 5: {
                    typeStr = "time";
                    minStr = sdf.format(v.min());
                    maxStr = sdf.format(v.max());
                    break;
                }
                default: {
                    throw H2O.unimpl();
                }
            }
            long numNAs = v.naCnt();
            String naStr = numNAs > 0L ? String.format("%d", numNAs) : "";
            String isConstantStr = isConstant ? "constant" : "";
            String numLevelsStr = isCategorical ? String.format("%d", v.domain().length) : "";
            boolean printLogSeparatorToStdout = false;
            int MAX_HEAD_TO_PRINT_ON_STDOUT = 10;
            int MAX_TAIL_TO_PRINT_ON_STDOUT = 10;
            if (vecArr.length <= 20) {
                printColumnToStdout = true;
            } else if (i < 10) {
                printColumnToStdout = true;
            } else if (i == 10) {
                printLogSeparatorToStdout = true;
                printColumnToStdout = false;
            } else {
                printColumnToStdout = i + 10 >= vecArr.length;
            }
            if (printLogSeparatorToStdout) {
                Log.info("Additional column information only sent to log file...");
            }
            String s = String.format(format, CStr, typeStr, minStr, maxStr, naStr, isConstantStr, numLevelsStr);
            Log.info(s, printColumnToStdout);
        }
        Log.info(FrameUtils.chunkSummary(fr).toString());
    }

    private static class MultiFileParseTask
    extends MRTask<MultiFileParseTask> {
        private final ParseSetup _parseSetup;
        private final Vec.VectorGroup _vg;
        private final int _vecIdStart;
        private static NonBlockingHashMap<Key, Categorical[]> _enums = new NonBlockingHashMap();
        private final Key _eKey = Key.make();
        private final boolean _deleteOnDone;
        private int[] _chunk2ParseNodeMap;
        private final Key _jobKey;
        private final int[] _fileChunkOffsets;
        FVecParseWriter[] _dout;
        String[] _errors;
        int _reservedKeys;
        private AppendableVec[] _vecs;

        MultiFileParseTask(Vec.VectorGroup vg, ParseSetup setup, Key jobKey, Key[] fkeys, boolean deleteOnDone) {
            this._vg = vg;
            this._parseSetup = setup;
            this._reservedKeys = this._parseSetup._parse_type == ParserType.SVMLight ? 100000000 : setup._number_columns;
            this._vecIdStart = this._vg.reserveKeys(this._reservedKeys);
            this._deleteOnDone = deleteOnDone;
            this._jobKey = jobKey;
            this._fileChunkOffsets = new int[fkeys.length];
            int len = 0;
            for (int i = 0; i < fkeys.length; ++i) {
                this._fileChunkOffsets[i] = len;
                len += ParseDataset.getByteVec(fkeys[i]).nChunks();
            }
            this._chunk2ParseNodeMap = MemoryManager.malloc4(len);
            Arrays.fill(this._chunk2ParseNodeMap, -1);
        }

        @Override
        public void postGlobal() {
            Future f;
            int i;
            int n;
            Log.trace("Begin file parse cleanup.");
            for (n = this._dout.length - 1; this._dout[n] == null && n != 0; --n) {
            }
            for (int i2 = 0; i2 <= n; ++i2) {
                if (this._dout[i2] != null) continue;
                this._dout[i2] = this._dout[n];
                --n;
                while (n > i2 && this._dout[n] == null) {
                    --n;
                }
            }
            if (n < this._dout.length - 1) {
                this._dout = Arrays.copyOf(this._dout, n + 1);
            }
            if (this._dout.length == 1) {
                this._vecs = this._dout[0]._vecs;
                return;
            }
            int nCols = 0;
            for (FVecParseWriter dout : this._dout) {
                nCols = Math.max(dout._vecs.length, nCols);
            }
            AppendableVec[] res = new AppendableVec[nCols];
            int nchunks = 0;
            for (FVecParseWriter dout : this._dout) {
                nchunks += dout.nChunks();
            }
            long[] espc = MemoryManager.malloc8(nchunks);
            for (i = 0; i < res.length; ++i) {
                res[i] = new AppendableVec(this._vg.vecKey(this._vecIdStart + i), espc, 0);
                res[i].setTypes(MemoryManager.malloc1(nchunks));
            }
            for (i = 0; i < this._dout.length; ++i) {
                for (int j = 0; j < this._dout[i]._vecs.length; ++j) {
                    res[j].setSubRange(this._dout[i]._vecs[j]);
                }
            }
            if (res.length + this._vecIdStart < this._reservedKeys && (f = this._vg.tryReturnKeys(this._vecIdStart + this._reservedKeys, this._vecIdStart + res.length)) != null) {
                try {
                    f.get();
                }
                catch (InterruptedException e) {
                }
                catch (ExecutionException e) {
                    // empty catch block
                }
            }
            this._vecs = res;
            Log.trace("Finished file parse cleanup.");
        }

        private AppendableVec[] vecs() {
            return this._vecs;
        }

        @Override
        public void setupLocal() {
            this._dout = new FVecParseWriter[this._keys.length];
        }

        private static Categorical[] enums(Key eKey, int ncols) {
            Categorical[] enums = _enums.get(eKey);
            if (enums != null) {
                return enums;
            }
            enums = new Categorical[ncols];
            for (int i = 0; i < enums.length; ++i) {
                enums[i] = new Categorical();
            }
            _enums.putIfAbsent(eKey, enums);
            return _enums.get(eKey);
        }

        private void chunksAreLocal(Vec vec, int chunkStartIdx, Key key) {
            for (int i = 0; i < vec.nChunks(); ++i) {
                this._chunk2ParseNodeMap[chunkStartIdx + i] = H2O.SELF.index();
            }
            Object ice = DKV.get(key).get();
            if (ice == vec) {
                if (this._deleteOnDone) {
                    vec.remove();
                }
            } else {
                Frame fr = (Frame)ice;
                if (this._deleteOnDone) {
                    fr.delete(this._jobKey, new Futures()).blockForPending();
                } else if (fr._key != null) {
                    fr.unlock(this._jobKey);
                }
            }
        }

        private FVecParseWriter makeDout(ParseSetup localSetup, int chunkOff, int nchunks) {
            AppendableVec[] avs = new AppendableVec[localSetup._number_columns];
            long[] espc = MemoryManager.malloc8(nchunks);
            for (int i = 0; i < avs.length; ++i) {
                avs[i] = new AppendableVec(this._vg.vecKey(i + this._vecIdStart), espc, chunkOff);
            }
            return localSetup._parse_type == ParserType.SVMLight ? new SVMLightFVecParseWriter(this._vg, this._vecIdStart, chunkOff, this._parseSetup._chunk_size, avs) : new FVecParseWriter(this._vg, chunkOff, MultiFileParseTask.enums(this._eKey, localSetup._number_columns), localSetup._column_types, this._parseSetup._chunk_size, avs);
        }

        @Override
        public void map(Key key) {
            if (((Job)DKV.getGet(this._jobKey)).isCancelledOrCrashed()) {
                return;
            }
            ParseSetup localSetup = new ParseSetup(this._parseSetup);
            ByteVec vec = ParseDataset.getByteVec(key);
            int chunkStartIdx = this._fileChunkOffsets[this._lo];
            Log.trace("Begin a map stage of a file parse with start index " + chunkStartIdx + ".");
            byte[] zips = vec.getFirstBytes();
            ZipUtil.Compression cpr = ZipUtil.guessCompressionMethod(zips);
            if (localSetup._check_header == 1) {
                localSetup._check_header = localSetup.parser(this._jobKey).fileHasHeader(ZipUtil.unzipBytes(zips, cpr, localSetup._chunk_size), localSetup);
            }
            try {
                switch (cpr) {
                    case NONE: {
                        if (this._parseSetup._parse_type._parallelParseSupported) {
                            DistributedParse dp = new DistributedParse(this._vg, localSetup, this._vecIdStart, chunkStartIdx, this, key, vec.nChunks());
                            this.addToPendingCount(1);
                            dp.setCompleter(this);
                            dp.asyncExec(vec);
                            for (int i = 0; i < vec.nChunks(); ++i) {
                                this._chunk2ParseNodeMap[chunkStartIdx + i] = vec.chunkKey(i).home_node().index();
                            }
                            break;
                        }
                        InputStream bvs = vec.openStream(this._jobKey);
                        this._dout[this._lo] = this.streamParse(bvs, localSetup, this.makeDout(localSetup, chunkStartIdx, vec.nChunks()), bvs);
                        this.chunksAreLocal(vec, chunkStartIdx, key);
                        break;
                    }
                    case ZIP: {
                        ZipEntry ze2;
                        InputStream bvs = vec.openStream(this._jobKey);
                        ZipInputStream zis = new ZipInputStream(bvs);
                        ZipEntry ze = zis.getNextEntry();
                        if (ze != null && !ze.isDirectory()) {
                            this._dout[this._lo] = this.streamParse(zis, localSetup, this.makeDout(localSetup, chunkStartIdx, vec.nChunks()), bvs);
                        }
                        if ((ze2 = zis.getNextEntry()) != null && !ze.isDirectory()) {
                            Log.warn("Only single file zip archives are currently supported, only file: " + ze.getName() + " has been parsed.  Remaining files have been ignored.");
                        } else {
                            zis.close();
                        }
                        this.chunksAreLocal(vec, chunkStartIdx, key);
                        break;
                    }
                    case GZIP: {
                        InputStream bvs = vec.openStream(this._jobKey);
                        this._dout[this._lo] = this.streamParse(new GZIPInputStream(bvs), localSetup, this.makeDout(localSetup, chunkStartIdx, vec.nChunks()), bvs);
                        this.chunksAreLocal(vec, chunkStartIdx, key);
                        break;
                    }
                }
                Log.trace("Finished a map stage of a file parse with start index " + chunkStartIdx + ".");
            }
            catch (IOException ioe) {
                throw new RuntimeException(ioe);
            }
            catch (H2OParseException pe) {
                throw new H2OParseException(key, pe);
            }
        }

        @Override
        public void reduce(MultiFileParseTask mfpt) {
            assert (this != mfpt);
            Log.trace("Begin a reduce stage of a file parse.");
            if (this._dout == null) {
                this._dout = mfpt._dout;
            } else if (this._dout != mfpt._dout) {
                this._dout = ArrayUtils.append(this._dout, mfpt._dout);
            }
            if (this._chunk2ParseNodeMap == null) {
                this._chunk2ParseNodeMap = mfpt._chunk2ParseNodeMap;
            } else if (this._chunk2ParseNodeMap != mfpt._chunk2ParseNodeMap) {
                for (int i = 0; i < this._chunk2ParseNodeMap.length; ++i) {
                    if (this._chunk2ParseNodeMap[i] == -1) {
                        this._chunk2ParseNodeMap[i] = mfpt._chunk2ParseNodeMap[i];
                        continue;
                    }
                    assert (mfpt._chunk2ParseNodeMap[i] == -1) : Arrays.toString(this._chunk2ParseNodeMap) + " :: " + Arrays.toString(mfpt._chunk2ParseNodeMap);
                }
            }
            this._errors = ArrayUtils.append(this._errors, mfpt._errors);
            Log.trace("Finished a reduce stage of a file parse.");
        }

        private FVecParseWriter streamParse(InputStream is, ParseSetup localSetup, FVecParseWriter dout, InputStream bvs) throws IOException {
            Parser p = localSetup.parser(this._jobKey);
            if (localSetup._parse_type._parallelParseSupported) {
                p.streamParseZip(is, dout, bvs);
            } else {
                p.streamParse(is, dout);
            }
            dout.close(this._fs);
            return dout;
        }

        private Futures onExceptionCleanup(Futures fs) {
            int nchunks = this._chunk2ParseNodeMap.length;
            int ncols = this._parseSetup._number_columns;
            for (int i = 0; i < ncols; ++i) {
                Key<Vec> vkey = this._vg.vecKey(this._vecIdStart + i);
                Keyed.remove(vkey, fs);
                for (int c = 0; c < nchunks; ++c) {
                    DKV.remove(Vec.chunkKey(vkey, c), fs);
                }
            }
            this.cancel(true);
            return fs;
        }

        private static class DistributedParse
        extends MRTask<DistributedParse> {
            private final ParseSetup _setup;
            private final int _vecIdStart;
            private final int _startChunkIdx;
            private final Vec.VectorGroup _vg;
            private FVecParseWriter _dout;
            private final Key _eKey;
            private final Key _jobKey;
            private final transient MultiFileParseTask _outerMFPT;
            private final transient Key _srckey;
            private transient NonBlockingSetInt _visited;
            private transient long[] _espc;
            final int _nchunks;

            DistributedParse(Vec.VectorGroup vg, ParseSetup setup, int vecIdstart, int startChunkIdx, MultiFileParseTask mfpt, Key srckey, int nchunks) {
                super(mfpt);
                this._vg = vg;
                this._setup = setup;
                this._vecIdStart = vecIdstart;
                this._startChunkIdx = startChunkIdx;
                this._outerMFPT = mfpt;
                this._eKey = mfpt._eKey;
                this._jobKey = mfpt._jobKey;
                this._srckey = srckey;
                this._nchunks = nchunks;
            }

            @Override
            public void setupLocal() {
                super.setupLocal();
                this._visited = new NonBlockingSetInt();
                this._espc = MemoryManager.malloc8(this._nchunks);
            }

            @Override
            public void map(Chunk in) {
                FVecParseWriter dout;
                Parser p;
                if (((Job)DKV.getGet(this._jobKey)).isCancelledOrCrashed()) {
                    return;
                }
                AppendableVec[] avs = new AppendableVec[this._setup._number_columns];
                for (int i = 0; i < avs.length; ++i) {
                    avs[i] = new AppendableVec(this._vg.vecKey(this._vecIdStart + i), this._espc, this._startChunkIdx);
                }
                FVecParseReader din = new FVecParseReader(in);
                switch (this._setup._parse_type) {
                    case ARFF: 
                    case CSV: {
                        Categorical[] enums = MultiFileParseTask.enums(this._eKey, this._setup._number_columns);
                        p = new CsvParser(this._setup, this._jobKey);
                        dout = new FVecParseWriter(this._vg, this._startChunkIdx + in.cidx(), enums, this._setup._column_types, this._setup._chunk_size, avs);
                        break;
                    }
                    case SVMLight: {
                        p = new SVMLightParser(this._setup, this._jobKey);
                        dout = new SVMLightFVecParseWriter(this._vg, this._vecIdStart, in.cidx() + this._startChunkIdx, this._setup._chunk_size, avs);
                        break;
                    }
                    default: {
                        throw H2O.unimpl();
                    }
                }
                p.parseChunk(in.cidx(), din, dout);
                this._dout = dout;
                this._dout.close(this._fs);
                Job.update(in._len, this._jobKey);
                this.freeMem(in);
            }

            private void freeMem(Chunk in) {
                int cidx = in.cidx();
                for (int i = 0; i < 2; ++i) {
                    if (this._visited.add(cidx += i)) continue;
                    Value v = H2O.get(in.vec().chunkKey(cidx));
                    if (v == null || !v.isPersisted()) {
                        return;
                    }
                    v.freePOJO();
                    v.freeMem();
                }
            }

            @Override
            public void reduce(DistributedParse dp) {
                this._dout.reduce(dp._dout);
            }

            @Override
            public void postGlobal() {
                super.postGlobal();
                this._outerMFPT._dout[((MultiFileParseTask)this._outerMFPT)._lo] = this._dout;
                this._dout = null;
                Value val = DKV.get(this._srckey);
                if (val == null) {
                    return;
                }
                Object ice = val.get();
                if (ice instanceof ByteVec) {
                    if (this._outerMFPT._deleteOnDone) {
                        ((ByteVec)ice).remove();
                    }
                } else {
                    Frame fr = (Frame)ice;
                    if (this._outerMFPT._deleteOnDone) {
                        fr.delete(this._outerMFPT._jobKey, new Futures()).blockForPending();
                    } else if (fr._key != null) {
                        fr.unlock(this._outerMFPT._jobKey);
                    }
                }
            }
        }
    }

    private static class SVFTask
    extends MRTask<SVFTask> {
        private final Frame _f;

        private SVFTask(Frame f) {
            this._f = f;
        }

        @Override
        public void setupLocal() {
            if (this._f.numCols() == 0) {
                return;
            }
            Vec v0 = this._f.anyVec();
            ArrayList<1> rs = new ArrayList<1>();
            for (int i = 0; i < v0.nChunks(); ++i) {
                if (!v0.chunkKey(i).home()) continue;
                final int fi = i;
                rs.add(new RecursiveAction(){

                    @Override
                    protected void compute() {
                        Value val;
                        int nlines = 0;
                        for (Vec vec : SVFTask.this._f.vecs()) {
                            val = H2O.get(vec.chunkKey(fi));
                            if (val == null) continue;
                            nlines = ((Chunk)val.get())._len;
                            break;
                        }
                        int fnlines = nlines;
                        for (int j = 0; j < SVFTask.this._f.numCols(); ++j) {
                            Vec vec = SVFTask.this._f.vec(j);
                            Key k = vec.chunkKey(fi);
                            val = H2O.get(k);
                            if (val != null) continue;
                            H2O.putIfMatch(k, new Value(k, (Freezable)new C0LChunk(0L, fnlines)), null);
                        }
                    }
                });
            }
            ForkJoinTask.invokeAll(rs);
        }

        @Override
        public void reduce(SVFTask drt) {
        }
    }

    private static class GatherCategoricalDomainsTask
    extends MRTask<GatherCategoricalDomainsTask> {
        private final Key _k;
        private final int[] _catColIdxs;
        private byte[][] _packedDomains;

        private GatherCategoricalDomainsTask(Key k, int[] ccols) {
            this._k = k;
            this._catColIdxs = ccols;
        }

        @Override
        public void setupLocal() {
            if (!MultiFileParseTask._enums.containsKey(this._k)) {
                return;
            }
            this._packedDomains = new byte[this._catColIdxs.length][];
            ValueString[][] _perColDomains = new ValueString[this._catColIdxs.length][];
            Categorical[] _colCats = (Categorical[])MultiFileParseTask._enums.get(this._k);
            int i = 0;
            for (int col : this._catColIdxs) {
                _colCats[col].convertToUTF8(col + 1);
                _perColDomains[i] = _colCats[col].getColumnDomain();
                Arrays.sort(_perColDomains[i]);
                this._packedDomains[i] = this.packDomain(_perColDomains[i]);
                ++i;
            }
        }

        @Override
        public void reduce(GatherCategoricalDomainsTask other) {
            if (this._packedDomains == null) {
                this._packedDomains = other._packedDomains;
            } else if (other._packedDomains != null) {
                int i;
                H2O.H2OCountedCompleter[] domtasks = new H2O.H2OCountedCompleter[this._catColIdxs.length];
                for (i = 0; i < this._catColIdxs.length; ++i) {
                    final int fi = i;
                    final GatherCategoricalDomainsTask fOther = other;
                    domtasks[i] = new H2O.H2OCountedCompleter(){

                        @Override
                        public void compute2() {
                            int remainder;
                            byte[] thisDom = GatherCategoricalDomainsTask.this._packedDomains[fi];
                            byte[] otherDom = fOther._packedDomains[fi];
                            int tLen = UnsafeUtils.get4(thisDom, 0);
                            int oLen = UnsafeUtils.get4(otherDom, 0);
                            int tDomLen = UnsafeUtils.get4(thisDom, 4);
                            int oDomLen = UnsafeUtils.get4(otherDom, 4);
                            ValueString tCat = new ValueString(thisDom, 8, tDomLen);
                            ValueString oCat = new ValueString(otherDom, 8, oDomLen);
                            int ti = 0;
                            int oi = 0;
                            int tbi = 8;
                            int obi = 8;
                            int mbi = 4;
                            int mergeLen = 0;
                            byte[] mergedDom = new byte[thisDom.length + otherDom.length];
                            while (ti < tLen && oi < oLen) {
                                int j;
                                int x = tCat.compareTo(oCat);
                                if (x <= 0) {
                                    UnsafeUtils.set4(mergedDom, mbi, tDomLen);
                                    mbi += 4;
                                    for (j = 0; j < tDomLen; ++j) {
                                        mergedDom[mbi++] = thisDom[tbi++];
                                    }
                                    tDomLen = UnsafeUtils.get4(thisDom, tbi);
                                    tCat.set(thisDom, tbi += 4, tDomLen);
                                    ++ti;
                                    if (x == 0) {
                                        oDomLen = UnsafeUtils.get4(otherDom, obi += oDomLen);
                                        oCat.set(otherDom, obi += 4, oDomLen);
                                        ++oi;
                                    }
                                } else {
                                    UnsafeUtils.set4(mergedDom, mbi, oDomLen);
                                    mbi += 4;
                                    for (j = 0; j < oDomLen; ++j) {
                                        mergedDom[mbi++] = otherDom[obi++];
                                    }
                                    oDomLen = UnsafeUtils.get4(otherDom, obi);
                                    oCat.set(otherDom, obi += 4, oDomLen);
                                    ++oi;
                                }
                                ++mergeLen;
                            }
                            if (ti < tLen) {
                                remainder = thisDom.length - (tbi -= 4);
                                System.arraycopy(thisDom, tbi, mergedDom, mbi, remainder);
                                mbi += remainder;
                                mergeLen += tLen - ti;
                            } else {
                                remainder = otherDom.length - (obi -= 4);
                                System.arraycopy(otherDom, obi, mergedDom, mbi, remainder);
                                mbi += remainder;
                                mergeLen += oLen - oi;
                            }
                            ((GatherCategoricalDomainsTask)GatherCategoricalDomainsTask.this)._packedDomains[fi] = Arrays.copyOf(mergedDom, mbi);
                            UnsafeUtils.set4(GatherCategoricalDomainsTask.this._packedDomains[fi], 0, mergeLen);
                            this.tryComplete();
                        }
                    };
                    H2O.submitTask(domtasks[i]);
                }
                for (i = 0; i < this._catColIdxs.length; ++i) {
                    if (domtasks[i] == null) continue;
                    domtasks[i].join();
                }
            }
        }

        private byte[] packDomain(ValueString[] domain) {
            int totStrLen = 0;
            for (ValueString dom : domain) {
                totStrLen += dom.length();
            }
            byte[] packedDom = MemoryManager.malloc1(4 + (domain.length << 2) + totStrLen, false);
            UnsafeUtils.set4(packedDom, 0, domain.length);
            int i = 4;
            for (ValueString dom : domain) {
                UnsafeUtils.set4(packedDom, i, dom.length());
                i += 4;
                byte[] buf = dom.getBuffer();
                for (int j = 0; j < buf.length; ++j) {
                    packedDom[i++] = buf[j];
                }
            }
            return packedDom;
        }

        public int getDomainLength(int colIdx) {
            if (this._packedDomains == null) {
                return 0;
            }
            return UnsafeUtils.get4(this._packedDomains[colIdx], 0);
        }

        public String[] getDomain(int colIdx) {
            if (this._packedDomains == null) {
                return null;
            }
            int strCnt = UnsafeUtils.get4(this._packedDomains[colIdx], 0);
            String[] res = new String[strCnt];
            int j = 4;
            for (int i = 0; i < strCnt; ++i) {
                int strLen = UnsafeUtils.get4(this._packedDomains[colIdx], j);
                res[i] = new String(this._packedDomains[colIdx], j += 4, strLen, Charsets.UTF_8);
                j += strLen;
            }
            return res;
        }
    }

    private static class UpdateCategoricalChunksTask
    extends MRTask<UpdateCategoricalChunksTask> {
        private final Key _parseCatMapsKey;
        private final int[] _chunk2ParseNodeMap;

        private UpdateCategoricalChunksTask(Key parseCatMapsKey, int[] chunk2ParseNodeMap) {
            this._parseCatMapsKey = parseCatMapsKey;
            this._chunk2ParseNodeMap = chunk2ParseNodeMap;
        }

        @Override
        public void map(Chunk[] chks) {
            CategoricalUpdateMap temp = (CategoricalUpdateMap)DKV.getGet(Key.make(this._parseCatMapsKey.toString() + "parseCatMapNode" + this._chunk2ParseNodeMap[chks[0].cidx()]));
            int[][] _parse2GlobalCatMaps = temp.map;
            int cidx = chks[0].cidx();
            for (int i = 0; i < chks.length; ++i) {
                Chunk chk = chks[i];
                if (!(chk instanceof CStrChunk)) {
                    for (int j = 0; j < chk._len; ++j) {
                        if (chk.isNA(j)) continue;
                        int old = (int)chk.at8(j);
                        if (old < 0 || old >= _parse2GlobalCatMaps[i].length) {
                            chk.reportBrokenEnum(i, j, old, _parse2GlobalCatMaps[i], this._fr.vec(i).domain().length);
                        }
                        if (_parse2GlobalCatMaps[i][old] < 0) {
                            throw new H2OParseException("Error in unifying categorical values. This is typically caused by unrecognized characters in the data.\n The problem categorical value occurred in the " + PrettyPrint.withOrdinalIndicator(i + 1) + " categorical col, " + PrettyPrint.withOrdinalIndicator(chk.start() + (long)j) + " row.");
                        }
                        chk.set(j, _parse2GlobalCatMaps[i][old]);
                    }
                }
                chk.close(cidx, this._fs);
            }
        }

        @Override
        public void postGlobal() {
            for (int i = 0; i < H2O.CLOUD.size(); ++i) {
                DKV.remove(Key.make(this._parseCatMapsKey.toString() + "parseCatMapNode" + i));
            }
        }
    }

    private static class CreateParse2GlobalCategoricalMaps
    extends DTask<CreateParse2GlobalCategoricalMaps> {
        private final Key _parseCatMapsKey;
        private final Key _frKey;
        private final byte _priority;
        private final int _eColCnt;

        private CreateParse2GlobalCategoricalMaps(Key parseCatMapsKey, Key key, int eColCnt) {
            this._parseCatMapsKey = parseCatMapsKey;
            this._frKey = key;
            this._eColCnt = eColCnt;
            this._priority = this.nextThrPriority();
        }

        @Override
        public byte priority() {
            return this._priority;
        }

        @Override
        public void compute2() {
            Frame _fr = (Frame)DKV.getGet(this._frKey);
            if (!MultiFileParseTask._enums.containsKey(this._parseCatMapsKey)) {
                this.tryComplete();
                return;
            }
            Categorical[] parseCatMaps = (Categorical[])MultiFileParseTask._enums.get(this._parseCatMapsKey);
            int[][] _nodeOrdMaps = new int[this._eColCnt][];
            int catColIdx = 0;
            for (int colIdx = 0; colIdx < parseCatMaps.length; ++colIdx) {
                if (parseCatMaps[colIdx].size() == 0) continue;
                _nodeOrdMaps[catColIdx] = MemoryManager.malloc4(parseCatMaps[colIdx].maxId() + 1);
                Arrays.fill(_nodeOrdMaps[catColIdx], -1);
                ValueString[] unifiedDomain = ValueString.toValueString(_fr.vec(colIdx).domain());
                for (int i = 0; i < unifiedDomain.length; ++i) {
                    if (!parseCatMaps[colIdx].containsKey(unifiedDomain[i])) continue;
                    _nodeOrdMaps[catColIdx][parseCatMaps[colIdx].getTokenId((ValueString)unifiedDomain[i])] = i;
                }
                ++catColIdx;
            }
            DKV.put(Key.make(this._parseCatMapsKey.toString() + "parseCatMapNode" + H2O.SELF.index()), new CategoricalUpdateMap(_nodeOrdMaps));
            this.tryComplete();
        }
    }

    private static class CategoricalUpdateMap
    extends Iced {
        final int[][] map;

        public CategoricalUpdateMap(int[][] map) {
            this.map = map;
        }
    }

    public static class ParserFJTask
    extends H2O.H2OCountedCompleter {
        final ParseDataset _job;
        final Key[] _keys;
        final ParseSetup _setup;
        final boolean _deleteOnDone;

        public ParserFJTask(ParseDataset job, Key[] keys, ParseSetup setup, boolean deleteOnDone) {
            this._job = job;
            this._keys = keys;
            this._setup = setup;
            this._deleteOnDone = deleteOnDone;
        }

        @Override
        public void compute2() {
            ParseDataset.parseAllKeys(this._job, this._keys, this._setup, this._deleteOnDone);
            this.tryComplete();
        }

        @Override
        public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) {
            if (this._job != null) {
                this._job.failed(ex);
                this.parseCleanup();
                this._job._mfpt = null;
            }
            return true;
        }

        @Override
        public void onCompletion(CountedCompleter caller) {
            if (this._job.isCancelledOrCrashed()) {
                this.parseCleanup();
                this._job._mfpt = null;
            } else {
                this._job._mfpt = null;
                this._job.done();
            }
        }

        private void parseCleanup() {
            if (this._job != null) {
                Futures fs = new Futures();
                assert (((Job)DKV.getGet(this._job._key)).isStopped());
                if (this._job._mfpt != null) {
                    this._job._mfpt.onExceptionCleanup(fs);
                }
                for (Key k : this._keys) {
                    Keyed.remove(k, fs);
                }
                Keyed.remove(this._job._dest, fs);
                fs.blockForPending();
                DKV.put(this._job._key, this._job);
            }
        }
    }
}

