/*
 * Decompiled with CFR 0.152.
 */
package water.parser;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.TreeSet;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import jsr166y.CountedCompleter;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveAction;
import water.DKV;
import water.DTask;
import water.Freezable;
import water.Futures;
import water.H2O;
import water.H2ONode;
import water.Iced;
import water.Job;
import water.Key;
import water.Keyed;
import water.Lockable;
import water.MRTask;
import water.MemoryManager;
import water.RPC;
import water.Value;
import water.exceptions.H2OIllegalArgumentException;
import water.exceptions.H2OIllegalValueException;
import water.fvec.AppendableVec;
import water.fvec.ByteVec;
import water.fvec.C0LChunk;
import water.fvec.CStrChunk;
import water.fvec.Chunk;
import water.fvec.FileVec;
import water.fvec.Frame;
import water.fvec.Vec;
import water.nbhm.NonBlockingHashMap;
import water.nbhm.NonBlockingSetInt;
import water.parser.BufferedString;
import water.parser.Categorical;
import water.parser.DefaultParserProviders;
import water.parser.FVecParseReader;
import water.parser.FVecParseWriter;
import water.parser.ParseSetup;
import water.parser.ParseWriter;
import water.parser.Parser;
import water.parser.SVMLightFVecParseWriter;
import water.parser.ZipUtil;
import water.util.ArrayUtils;
import water.util.FrameUtils;
import water.util.GAUtils;
import water.util.Log;
import water.util.PrettyPrint;
import water.util.UnsafeUtils;

public final class ParseDataset {
    public Job<Frame> _job;
    private MultiFileParseTask _mfpt;

    public static Frame parse(Key okey, Key ... keys) {
        return ParseDataset.parse(okey, keys, true, false, 0);
    }

    public static Frame parse(Key okey, Key[] keys, boolean deleteOnDone, boolean singleQuote, int checkHeader) {
        return ParseDataset.parse(okey, keys, deleteOnDone, ParseSetup.guessSetup(keys, singleQuote, checkHeader));
    }

    public static Frame parse(Key okey, Key[] keys, boolean deleteOnDone, ParseSetup globalSetup) {
        return ParseDataset.parse((Key)okey, (Key[])keys, (boolean)deleteOnDone, (ParseSetup)globalSetup, (boolean)true)._job.get();
    }

    public static ParseDataset parse(Key okey, Key[] keys, boolean deleteOnDone, ParseSetup globalSetup, boolean blocking) {
        ParseDataset pds = ParseDataset.forkParseDataset(okey, keys, globalSetup, deleteOnDone);
        if (blocking) {
            pds._job.get();
        }
        return pds;
    }

    static ByteVec getByteVec(Key key) {
        Object ice = DKV.getGet(key);
        if (ice == null) {
            throw new H2OIllegalArgumentException("Missing data", "Did not find any data under key " + key);
        }
        return (ByteVec)(ice instanceof ByteVec ? ice : ((Frame)ice).vecs()[0]);
    }

    static String[] getColumnNames(int ncols, String[] colNames) {
        if (colNames == null) {
            colNames = new String[ncols];
            for (int i = 0; i < ncols; ++i) {
                colNames[i] = "C" + Integer.toString(i + 1);
            }
        } else {
            HashSet<String> nameSet = new HashSet<String>(Arrays.asList(colNames));
            colNames = Arrays.copyOf(colNames, ncols);
            for (int i = 0; i < ncols; ++i) {
                if (colNames[i] != null && !colNames[i].equals("")) continue;
                String tmp = "C" + Integer.toString(i + 1);
                while (nameSet.contains(tmp)) {
                    tmp = tmp + tmp;
                }
                colNames[i] = tmp;
            }
        }
        return colNames;
    }

    public static Job forkParseSVMLight(Key<Frame> dest, final Key[] keys, final ParseSetup setup) {
        int nchunks = 0;
        Vec v = null;
        for (int i = 0; i < keys.length; ++i) {
            Object ice = DKV.getGet(keys[i]);
            if (ice instanceof FileVec) {
                if (i == 0) {
                    v = (FileVec)ice;
                }
                ((FileVec)ice).setChunkSize(setup._chunk_size);
                nchunks += ((FileVec)ice).nChunks();
                Log.info("Parse chunk size " + setup._chunk_size);
                continue;
            }
            if (!(ice instanceof Frame) || !(((Frame)ice).vec(0) instanceof FileVec)) continue;
            if (i == 0) {
                v = ((Frame)ice).vec(0);
            }
            ((FileVec)((Frame)ice).vec(0)).setChunkSize((Frame)ice, setup._chunk_size);
            nchunks += ((Frame)ice).vec(0).nChunks();
            Log.info("Parse chunk size " + setup._chunk_size);
        }
        Vec.VectorGroup vg = v.group();
        final ParseDataset pds = new ParseDataset(dest);
        new Frame(pds._job._result, new String[0], new Vec[0]).delete_and_lock(pds._job);
        return pds._job.start(new H2O.H2OCountedCompleter(){

            @Override
            public void compute2() {
                ParseDataset.parseAllKeys(pds, keys, setup, true);
                this.tryComplete();
            }
        }, nchunks);
    }

    public static ParseDataset forkParseDataset(Key<Frame> dest, Key[] keys, ParseSetup parseSetup, boolean deleteOnDone) {
        int i;
        ParseSetup setup = parseSetup.getFinalSetup(keys, parseSetup);
        HashSet<String> conflictingNames = setup.checkDupColumnNames();
        for (String x : conflictingNames) {
            if (x == null || x.equals("")) continue;
            throw new IllegalArgumentException("Found duplicate column name " + x);
        }
        long totalParseSize = 0L;
        for (i = 0; i < keys.length; ++i) {
            Key k = keys[i];
            if (dest.equals(k)) {
                throw new IllegalArgumentException("Destination key " + dest + " must be different from all sources");
            }
            if (deleteOnDone) {
                for (int j = i + 1; j < keys.length; ++j) {
                    if (k != keys[j]) continue;
                    throw new IllegalArgumentException("Source key " + k + " appears twice, deleteOnDone must be false");
                }
            }
            totalParseSize += ParseDataset.getByteVec(k).length();
        }
        Log.info("Total file size: " + PrettyPrint.bytes(totalParseSize));
        for (i = 0; i < keys.length; ++i) {
            Object ice = DKV.getGet(keys[i]);
            if (ice instanceof FileVec) {
                ((FileVec)ice).setChunkSize(setup._chunk_size);
                Log.info("Parse chunk size " + setup._chunk_size);
                continue;
            }
            if (!(ice instanceof Frame) || !(((Frame)ice).vec(0) instanceof FileVec)) continue;
            ((FileVec)((Frame)ice).vec(0)).setChunkSize((Frame)ice, setup._chunk_size);
            Log.info("Parse chunk size " + setup._chunk_size);
        }
        long memsz = H2O.CLOUD.free_mem();
        if (totalParseSize > memsz * 4L) {
            throw new IllegalArgumentException("Total input file size of " + PrettyPrint.bytes(totalParseSize) + " is much larger than total cluster memory of " + PrettyPrint.bytes(memsz) + ", please use either a larger cluster or smaller data.");
        }
        if (H2O.GA != null) {
            GAUtils.logParse(totalParseSize, keys.length, setup._number_columns);
        }
        ParseDataset pds = new ParseDataset(dest);
        new Frame(pds._job._result, new String[0], new Vec[0]).delete_and_lock(pds._job);
        for (Key k : keys) {
            Lockable.read_lock(k, pds._job);
        }
        ParserFJTask fjt = new ParserFJTask(pds, keys, setup, deleteOnDone);
        pds._job.start(fjt, totalParseSize);
        return pds;
    }

    private ParseDataset(Key<Frame> dest) {
        this._job = new Job<Frame>(dest, Frame.class.getName(), "Parse");
    }

    private static ParseDataset parseAllKeys(ParseDataset pds, Key[] fkeys, ParseSetup setup, boolean deleteOnDone) {
        Job<Frame> job = pds._job;
        assert (setup._number_columns > 0);
        if (setup._column_names != null && (setup._column_names.length == 0 || setup._column_names.length == 1 && setup._column_names[0].isEmpty())) {
            setup._column_names = null;
        }
        if (setup._na_strings != null && setup._na_strings.length != setup._number_columns) {
            setup._na_strings = null;
        }
        if (fkeys.length == 0) {
            job.stop();
            return pds;
        }
        job.update(0L, "Ingesting files.");
        Vec.VectorGroup vg = ParseDataset.getByteVec(fkeys[0]).group();
        MultiFileParseTask mfpt = pds._mfpt = new MultiFileParseTask(vg, setup, job._key, fkeys, deleteOnDone);
        mfpt.doAll(fkeys);
        Log.trace("Done ingesting files.");
        if (job.stop_requested()) {
            return pds;
        }
        AppendableVec[] avs = mfpt.vecs();
        setup._column_names = ParseDataset.getColumnNames(avs.length, setup._column_names);
        Frame fr = null;
        int n = 0;
        int[] ecols2 = new int[avs.length];
        for (int i = 0; i < avs.length; ++i) {
            if (avs[i].get_type() != 4) continue;
            ecols2[n++] = i;
        }
        int[] ecols = Arrays.copyOf(ecols2, n);
        if (n > 0) {
            if (!setup.getParseType().isDomainProvided) {
                job.update(0L, "Collecting categorical domains across nodes.");
                GatherCategoricalDomainsTask gcdt = (GatherCategoricalDomainsTask)new GatherCategoricalDomainsTask(mfpt._cKey, ecols).doAllNodes();
                ArrayList<String> offendingColNames = new ArrayList<String>();
                for (int i = 0; i < ecols.length; ++i) {
                    if (gcdt.getDomainLength(i) < 10000000) {
                        if (gcdt.getDomainLength(i) == 0) {
                            avs[ecols[i]].setBad();
                            continue;
                        }
                        avs[ecols[i]].setDomain(gcdt.getDomain(i));
                        continue;
                    }
                    offendingColNames.add(setup._column_names[ecols[i]]);
                }
                if (offendingColNames.size() > 0) {
                    throw new H2OParseException("Exceeded categorical limit on columns " + offendingColNames + ".   Consider reparsing these columns as a string.");
                }
                Log.trace("Done collecting categorical domains across nodes.");
            } else {
                for (int i = 0; i < ecols.length; ++i) {
                    avs[ecols[i]].setDomain(setup._domains[ecols[i]]);
                }
            }
            job.update(0L, "Compressing data.");
            fr = new Frame(job._result, setup._column_names, AppendableVec.closeAll(avs));
            fr.update(job);
            Log.trace("Done compressing data.");
            if (!setup.getParseType().isDomainProvided) {
                Vec[] evecs = new Vec[ecols.length];
                for (int i = 0; i < evecs.length; ++i) {
                    evecs[i] = fr.vecs()[ecols[i]];
                }
                job.update(0L, "Unifying categorical domains across nodes.");
                CreateParse2GlobalCategoricalMaps[] fcdt = new CreateParse2GlobalCategoricalMaps[H2O.CLOUD.size()];
                RPC[] rpcs = new RPC[H2O.CLOUD.size()];
                for (int i = 0; i < fcdt.length; ++i) {
                    H2ONode[] nodes = H2O.CLOUD.members();
                    fcdt[i] = new CreateParse2GlobalCategoricalMaps(mfpt._cKey, fr._key, ecols);
                    rpcs[i] = new RPC<CreateParse2GlobalCategoricalMaps>(nodes[i], fcdt[i]).call();
                }
                for (RPC rpc : rpcs) {
                    rpc.get();
                }
                new UpdateCategoricalChunksTask(mfpt._cKey, mfpt._chunk2ParseNodeMap).doAll(evecs);
                MultiFileParseTask._categoricals.remove(mfpt._cKey);
                Log.trace("Done unifying categoricals across nodes.");
            }
        } else {
            job.update(0L, "Compressing data.");
            fr = new Frame(job._result, setup._column_names, AppendableVec.closeAll(avs));
            Log.trace("Done closing all Vecs.");
        }
        if (job.stop_requested()) {
            return pds;
        }
        if (setup._parse_type.equals(DefaultParserProviders.SVMLight_INFO)) {
            new SVFTask(fr).doAllNodes();
        }
        if (job.stop_requested()) {
            return pds;
        }
        ParseWriter.ParseErr[] errs = ArrayUtils.append(setup._errs, mfpt._errors);
        if (errs.length > 0) {
            String[] warns = new String[errs.length];
            HashMap<String, Integer> fileChunkOffsets = new HashMap<String, Integer>();
            for (int i = 0; i < mfpt._fileChunkOffsets.length; ++i) {
                fileChunkOffsets.put(fkeys[i].toString(), mfpt._fileChunkOffsets[i]);
            }
            long[] espc = fr.anyVec().espc();
            for (int i = 0; i < errs.length; ++i) {
                if (!fileChunkOffsets.containsKey(errs[i]._file)) continue;
                int espcOff = (Integer)fileChunkOffsets.get(errs[i]._file);
                errs[i]._gLineNum = espc[espcOff + errs[i]._cidx] + errs[i]._lineNum;
                errs[i]._lineNum = errs[i]._gLineNum - espc[espcOff];
            }
            TreeSet<ParseWriter.ParseErr> s = new TreeSet<ParseWriter.ParseErr>(new Comparator<ParseWriter.ParseErr>(){

                @Override
                public int compare(ParseWriter.ParseErr o1, ParseWriter.ParseErr o2) {
                    long res = o1._gLineNum - o2._gLineNum;
                    if (res == 0L) {
                        return o1._err.compareTo(o2._err);
                    }
                    return (int)res < 0 ? -1 : 1;
                }
            });
            for (ParseWriter.ParseErr e : errs) {
                s.add(e);
            }
            errs = s.toArray(new ParseWriter.ParseErr[s.size()]);
            for (int i = 0; i < errs.length; ++i) {
                Object[] objectArray = new Object[1];
                warns[i] = errs[i].toString();
                objectArray[0] = warns[i];
                Log.warn(objectArray);
            }
            job.setWarnings(warns);
        }
        job.update(0L, "Calculating data summary.");
        ParseDataset.logParseResults(fr);
        fr.update(job);
        Frame fr2 = (Frame)DKV.getGet(fr._key);
        assert (fr2._names.length == fr2.numCols());
        fr.unlock(job);
        if (deleteOnDone) {
            for (Key k : fkeys) {
                assert (DKV.get(k) == null) : "Input key " + k + " not deleted during parse";
            }
        }
        return pds;
    }

    public static void logParseResults(Frame fr) {
        Vec[] vecArr;
        long numRows = fr.anyVec().length();
        Log.info("Parse result for " + fr._key + " (" + Long.toString(numRows) + " rows):");
        Futures fs = new Futures();
        for (Vec v : vecArr = fr.vecs()) {
            v.startRollupStats(fs);
        }
        fs.blockForPending();
        int namelen = 0;
        for (String s : fr.names()) {
            namelen = Math.max(namelen, s.length());
        }
        String format = " %" + namelen + "s %7s %12.12s %12.12s %12.12s %12.12s %11s %8s %6s";
        Log.info(String.format(format, "ColV2", "type", "min", "max", "mean", "sigma", "NAs", "constant", "cardinality"));
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < vecArr.length; ++i) {
            boolean printColumnToStdout;
            String maxStr;
            String minStr;
            String typeStr;
            Vec v = vecArr[i];
            boolean isCategorical = v.isCategorical();
            boolean isConstant = v.isConst();
            String CStr = String.format("%" + namelen + "s:", fr.names()[i]);
            String meanStr = "";
            String sigmaStr = "";
            switch (v.get_type()) {
                case 0: {
                    typeStr = "all_NA";
                    minStr = "";
                    maxStr = "";
                    break;
                }
                case 1: {
                    typeStr = "UUID";
                    minStr = "";
                    maxStr = "";
                    break;
                }
                case 2: {
                    typeStr = "string";
                    minStr = "";
                    maxStr = "";
                    break;
                }
                case 3: {
                    typeStr = "numeric";
                    minStr = String.format("%g", v.min());
                    maxStr = String.format("%g", v.max());
                    meanStr = String.format("%g", v.mean());
                    sigmaStr = String.format("%g", v.sigma());
                    break;
                }
                case 4: {
                    typeStr = "factor";
                    minStr = v.factor(0L);
                    maxStr = v.factor(v.cardinality() - 1);
                    break;
                }
                case 5: {
                    typeStr = "time";
                    minStr = sdf.format(v.min());
                    maxStr = sdf.format(v.max());
                    break;
                }
                default: {
                    throw H2O.unimpl();
                }
            }
            long numNAs = v.naCnt();
            String naStr = numNAs > 0L ? String.format("%d", numNAs) : "";
            String isConstantStr = isConstant ? "constant" : "";
            String numLevelsStr = isCategorical ? String.format("%d", v.domain().length) : "";
            boolean launchedWithHadoopJar = H2O.ARGS.launchedWithHadoopJar();
            boolean printLogSeparatorToStdout = false;
            int MAX_HEAD_TO_PRINT_ON_STDOUT = 10;
            int MAX_TAIL_TO_PRINT_ON_STDOUT = 10;
            if (launchedWithHadoopJar) {
                printColumnToStdout = true;
            } else if (vecArr.length <= 20) {
                printColumnToStdout = true;
            } else if (i < 10) {
                printColumnToStdout = true;
            } else if (i == 10) {
                printLogSeparatorToStdout = true;
                printColumnToStdout = false;
            } else {
                printColumnToStdout = i + 10 >= vecArr.length;
            }
            if (printLogSeparatorToStdout) {
                Log.info("Additional column information only sent to log file...");
            }
            String s = String.format(format, CStr, typeStr, minStr, maxStr, meanStr, sigmaStr, naStr, isConstantStr, numLevelsStr);
            Log.info(s, printColumnToStdout);
        }
        Log.info(FrameUtils.chunkSummary(fr).toString());
    }

    public static class H2OParseException
    extends RuntimeException {
        public H2OParseException(String msg) {
            super(msg);
        }

        public H2OParseException(String msg, Throwable cause) {
            super(msg, cause);
        }

        public H2OParseException(Throwable cause) {
            super(cause);
        }

        public H2OParseException resetMsg(String msg) {
            H2OParseException pe1 = new H2OParseException(msg, this.getCause());
            pe1.setStackTrace(this.getStackTrace());
            return pe1;
        }
    }

    private static class MultiFileParseTask
    extends MRTask<MultiFileParseTask> {
        private final ParseSetup _parseSetup;
        private final Vec.VectorGroup _vg;
        private final int _vecIdStart;
        private static NonBlockingHashMap<Key, Categorical[]> _categoricals = new NonBlockingHashMap();
        private final Key _cKey = Key.make();
        private final boolean _deleteOnDone;
        private int[] _chunk2ParseNodeMap;
        private final Key<Job> _jobKey;
        private final int[] _fileChunkOffsets;
        FVecParseWriter[] _dout;
        int _reservedKeys;
        private ParseWriter.ParseErr[] _errors = new ParseWriter.ParseErr[0];
        private AppendableVec[] _vecs;

        MultiFileParseTask(Vec.VectorGroup vg, ParseSetup setup, Key<Job> jobKey, Key[] fkeys, boolean deleteOnDone) {
            this._vg = vg;
            this._parseSetup = setup;
            this._reservedKeys = this._parseSetup._parse_type.equals(DefaultParserProviders.SVMLight_INFO) ? 100000000 : setup._number_columns;
            this._vecIdStart = this._vg.reserveKeys(this._reservedKeys);
            this._deleteOnDone = deleteOnDone;
            this._jobKey = jobKey;
            this._fileChunkOffsets = new int[fkeys.length];
            int len = 0;
            for (int i = 0; i < fkeys.length; ++i) {
                this._fileChunkOffsets[i] = len;
                len += ParseDataset.getByteVec(fkeys[i]).nChunks();
            }
            this._chunk2ParseNodeMap = MemoryManager.malloc4(len);
            Arrays.fill(this._chunk2ParseNodeMap, -1);
        }

        @Override
        public void postGlobal() {
            Log.trace("Begin file parse cleanup.");
            int n = 0;
            for (int i = 0; i < this._dout.length; ++i) {
                if (this._dout[i] == null) continue;
                this._dout[n++] = this._dout[i];
            }
            if (n < this._dout.length) {
                this._dout = Arrays.copyOf(this._dout, n);
            }
            if (this._dout.length == 1) {
                this._vecs = this._dout[0]._vecs;
                return;
            }
            int nchunks = 0;
            int nCols = 0;
            for (FVecParseWriter dout : this._dout) {
                nchunks += dout._vecs[0]._tmp_espc.length;
                nCols = Math.max(dout._vecs.length, nCols);
            }
            long[] espc = MemoryManager.malloc8(nchunks);
            if (nCols > this._reservedKeys) {
                throw H2O.unimpl();
            }
            AppendableVec[] res = new AppendableVec[nCols];
            if (this._parseSetup._parse_type.equals(DefaultParserProviders.SVMLight_INFO)) {
                this._parseSetup._number_columns = res.length;
                this._parseSetup._column_types = new byte[res.length];
                Arrays.fill(this._parseSetup._column_types, (byte)3);
            }
            for (int i = 0; i < res.length; ++i) {
                res[i] = new AppendableVec(this._vg.vecKey(this._vecIdStart + i), espc, this._parseSetup._column_types[i], 0);
            }
            for (FVecParseWriter fvpw : this._dout) {
                AppendableVec[] avs = fvpw._vecs;
                long[] file_local_espc = avs[0]._tmp_espc;
                for (int j = 0; j < avs.length; ++j) {
                    assert (res[j]._key.equals(avs[j]._key));
                    assert (avs[0]._chunkOff == avs[j]._chunkOff);
                    assert (file_local_espc == avs[j]._tmp_espc || Arrays.equals(file_local_espc, avs[j]._tmp_espc));
                }
                System.arraycopy(file_local_espc, 0, espc, avs[0]._chunkOff, file_local_espc.length);
            }
            this._vecs = res;
            Log.trace("Finished file parse cleanup.");
        }

        private AppendableVec[] vecs() {
            return this._vecs;
        }

        @Override
        public void setupLocal() {
            this._dout = new FVecParseWriter[this._keys.length];
        }

        private static Categorical[] categoricals(Key cKey, int ncols) {
            Categorical[] categoricals = _categoricals.get(cKey);
            if (categoricals != null) {
                return categoricals;
            }
            categoricals = new Categorical[ncols];
            for (int i = 0; i < categoricals.length; ++i) {
                categoricals[i] = new Categorical();
            }
            _categoricals.putIfAbsent(cKey, categoricals);
            return _categoricals.get(cKey);
        }

        private void chunksAreLocal(Vec vec, int chunkStartIdx, Key key) {
            for (int i = 0; i < vec.nChunks(); ++i) {
                this._chunk2ParseNodeMap[chunkStartIdx + i] = H2O.SELF.index();
            }
            Object ice = DKV.get(key).get();
            if (ice == vec) {
                if (this._deleteOnDone) {
                    vec.remove();
                }
            } else {
                Frame fr = (Frame)ice;
                if (this._deleteOnDone) {
                    fr.delete(this._jobKey, new Futures()).blockForPending();
                } else if (fr._key != null) {
                    fr.unlock(this._jobKey);
                }
            }
        }

        private FVecParseWriter makeDout(ParseSetup localSetup, int chunkOff, int nchunks) {
            AppendableVec[] avs = new AppendableVec[localSetup._number_columns];
            long[] espc = MemoryManager.malloc8(nchunks);
            byte[] ctypes = localSetup._column_types;
            for (int i = 0; i < avs.length; ++i) {
                avs[i] = new AppendableVec(this._vg.vecKey(i + this._vecIdStart), espc, ctypes == null ? (byte)3 : ctypes[i], chunkOff);
            }
            return localSetup._parse_type.equals(DefaultParserProviders.SVMLight_INFO) ? new SVMLightFVecParseWriter(this._vg, this._vecIdStart, chunkOff, this._parseSetup._chunk_size, avs) : new FVecParseWriter(this._vg, chunkOff, MultiFileParseTask.categoricals(this._cKey, localSetup._number_columns), localSetup._column_types, this._parseSetup._chunk_size, avs);
        }

        @Override
        public void map(Key key) {
            if (this._jobKey.get().stop_requested()) {
                return;
            }
            ParseSetup localSetup = (ParseSetup)this._parseSetup.clone();
            ByteVec vec = ParseDataset.getByteVec(key);
            int chunkStartIdx = this._fileChunkOffsets[this._lo];
            Log.trace("Begin a map stage of a file parse with start index " + chunkStartIdx + ".");
            byte[] zips = vec.getFirstBytes();
            ZipUtil.Compression cpr = ZipUtil.guessCompressionMethod(zips);
            if (localSetup._check_header == 1) {
                localSetup._check_header = localSetup.parser(this._jobKey).fileHasHeader(ZipUtil.unzipBytes(zips, cpr, localSetup._chunk_size), localSetup);
            }
            try {
                switch (cpr) {
                    case NONE: {
                        if (this._parseSetup._parse_type.isParallelParseSupported()) {
                            ((DistributedParse)new DistributedParse(this._vg, localSetup, this._vecIdStart, chunkStartIdx, this, key, vec.nChunks()).dfork(vec)).getResult(false);
                            for (int i = 0; i < vec.nChunks(); ++i) {
                                this._chunk2ParseNodeMap[chunkStartIdx + i] = vec.chunkKey(i).home_node().index();
                            }
                            break;
                        }
                        InputStream bvs = vec.openStream(this._jobKey);
                        this._dout[this._lo] = this.streamParse(bvs, localSetup, this.makeDout(localSetup, chunkStartIdx, vec.nChunks()), bvs);
                        this._errors = this._dout[this._lo].removeErrors();
                        this.chunksAreLocal(vec, chunkStartIdx, key);
                        break;
                    }
                    case ZIP: {
                        InputStream bvs = vec.openStream(this._jobKey);
                        ZipInputStream zis = new ZipInputStream(bvs);
                        ZipEntry ze = zis.getNextEntry();
                        if (ze != null && !ze.isDirectory()) {
                            this._dout[this._lo] = this.streamParse(zis, localSetup, this.makeDout(localSetup, chunkStartIdx, vec.nChunks()), bvs);
                        }
                        this._errors = this._dout[this._lo].removeErrors();
                        ZipEntry ze2 = zis.getNextEntry();
                        if (ze2 != null && !ze.isDirectory()) {
                            Log.warn("Only single file zip archives are currently supported, only file: " + ze.getName() + " has been parsed.  Remaining files have been ignored.");
                        } else {
                            zis.close();
                        }
                        this.chunksAreLocal(vec, chunkStartIdx, key);
                        break;
                    }
                    case GZIP: {
                        InputStream bvs = vec.openStream(this._jobKey);
                        this._dout[this._lo] = this.streamParse(new GZIPInputStream(bvs), localSetup, this.makeDout(localSetup, chunkStartIdx, vec.nChunks()), bvs);
                        this._errors = this._dout[this._lo].removeErrors();
                        this.chunksAreLocal(vec, chunkStartIdx, key);
                        break;
                    }
                }
                Log.trace("Finished a map stage of a file parse with start index " + chunkStartIdx + ".");
            }
            catch (IOException ioe) {
                throw new RuntimeException(ioe);
            }
            catch (H2OParseException pe0) {
                throw pe0.resetMsg(pe0.getMessage() + " for " + key);
            }
        }

        @Override
        public void reduce(MultiFileParseTask mfpt) {
            assert (this != mfpt);
            Log.trace("Begin a reduce stage of a file parse.");
            if (this._dout == null) {
                this._dout = mfpt._dout;
            } else if (this._dout != mfpt._dout) {
                this._dout = ArrayUtils.append(this._dout, mfpt._dout);
            }
            if (this._chunk2ParseNodeMap == null) {
                this._chunk2ParseNodeMap = mfpt._chunk2ParseNodeMap;
            } else if (this._chunk2ParseNodeMap != mfpt._chunk2ParseNodeMap) {
                for (int i = 0; i < this._chunk2ParseNodeMap.length; ++i) {
                    if (this._chunk2ParseNodeMap[i] == -1) {
                        this._chunk2ParseNodeMap[i] = mfpt._chunk2ParseNodeMap[i];
                        continue;
                    }
                    assert (mfpt._chunk2ParseNodeMap[i] == -1) : Arrays.toString(this._chunk2ParseNodeMap) + " :: " + Arrays.toString(mfpt._chunk2ParseNodeMap);
                }
            }
            if (this._errors == null) {
                this._errors = mfpt._errors;
            } else if (this._errors.length < 20) {
                this._errors = ArrayUtils.append(this._errors, mfpt._errors);
                if (this._errors.length > 20) {
                    this._errors = Arrays.copyOf(this._errors, 20);
                }
            }
            Log.trace("Finished a reduce stage of a file parse.");
        }

        private FVecParseWriter streamParse(InputStream is, ParseSetup localSetup, FVecParseWriter dout, InputStream bvs) throws IOException {
            Parser p = localSetup.parser(this._jobKey);
            if (localSetup._parse_type.isParallelParseSupported) {
                p.streamParseZip(is, dout, bvs);
            } else {
                p.streamParse(is, dout);
            }
            dout.close(this._fs);
            return dout;
        }

        private Futures onExceptionCleanup(Futures fs) {
            int nchunks = this._chunk2ParseNodeMap.length;
            int ncols = this._parseSetup._number_columns;
            for (int i = 0; i < ncols; ++i) {
                Key<Vec> vkey = this._vg.vecKey(this._vecIdStart + i);
                Keyed.remove(vkey, fs);
                for (int c = 0; c < nchunks; ++c) {
                    DKV.remove(Vec.chunkKey(vkey, c), fs);
                }
            }
            this.cancel(true);
            return fs;
        }

        static /* synthetic */ ParseWriter.ParseErr[] access$1102(MultiFileParseTask x0, ParseWriter.ParseErr[] x1) {
            x0._errors = x1;
            return x1;
        }

        private static class DistributedParse
        extends MRTask<DistributedParse> {
            private final ParseSetup _setup;
            private final int _vecIdStart;
            private final int _startChunkIdx;
            private final Vec.VectorGroup _vg;
            private FVecParseWriter _dout;
            private final Key _cKey;
            private final Key<Job> _jobKey;
            private final transient MultiFileParseTask _outerMFPT;
            private final transient Key _srckey;
            private transient NonBlockingSetInt _visited;
            private transient long[] _espc;
            final int _nchunks;

            DistributedParse(Vec.VectorGroup vg, ParseSetup setup, int vecIdstart, int startChunkIdx, MultiFileParseTask mfpt, Key srckey, int nchunks) {
                super(null);
                this._vg = vg;
                this._setup = setup;
                this._vecIdStart = vecIdstart;
                this._startChunkIdx = startChunkIdx;
                this._outerMFPT = mfpt;
                this._cKey = mfpt._cKey;
                this._jobKey = mfpt._jobKey;
                this._srckey = srckey;
                this._nchunks = nchunks;
            }

            @Override
            public void setupLocal() {
                super.setupLocal();
                this._visited = new NonBlockingSetInt();
                this._espc = MemoryManager.malloc8(this._nchunks);
            }

            @Override
            public void map(Chunk in) {
                FVecParseWriter dout;
                if (this._jobKey.get().stop_requested()) {
                    return;
                }
                AppendableVec[] avs = new AppendableVec[this._setup._number_columns];
                for (int i = 0; i < avs.length; ++i) {
                    avs[i] = this._setup._column_types == null ? new AppendableVec(this._vg.vecKey(this._vecIdStart + i), this._espc, 3, this._startChunkIdx) : new AppendableVec(this._vg.vecKey(this._vecIdStart + i), this._espc, this._setup._column_types[i], this._startChunkIdx);
                }
                FVecParseReader din = new FVecParseReader(in);
                Parser p = this._setup.parser(this._jobKey);
                String string = this._setup._parse_type.name();
                int n = -1;
                switch (string.hashCode()) {
                    case 2017457: {
                        if (!string.equals("ARFF")) break;
                        n = 0;
                        break;
                    }
                    case 67046: {
                        if (!string.equals("CSV")) break;
                        n = 1;
                        break;
                    }
                    case -81862420: {
                        if (!string.equals("SVMLight")) break;
                        n = 2;
                    }
                }
                switch (n) {
                    case 0: 
                    case 1: {
                        Categorical[] categoricals = MultiFileParseTask.categoricals(this._cKey, this._setup._number_columns);
                        dout = new FVecParseWriter(this._vg, this._startChunkIdx + in.cidx(), categoricals, this._setup._column_types, this._setup._chunk_size, avs);
                        break;
                    }
                    case 2: {
                        dout = new SVMLightFVecParseWriter(this._vg, this._vecIdStart, in.cidx() + this._startChunkIdx, this._setup._chunk_size, avs);
                        break;
                    }
                    default: {
                        dout = new FVecParseWriter(this._vg, in.cidx() + this._startChunkIdx, null, this._setup._column_types, this._setup._chunk_size, avs);
                    }
                }
                p.parseChunk(in.cidx(), din, dout);
                this._dout = dout;
                this._dout.close(this._fs);
                if (this._dout.hasErrors()) {
                    for (ParseWriter.ParseErr err : this._dout._errs) {
                        assert (err != null) : "Parse error cannot be null!";
                        err._file = this._srckey.toString();
                    }
                }
                Job.update(in._len, this._jobKey);
                this.freeMem(in);
            }

            private void freeMem(Chunk in) {
                int cidx = in.cidx();
                for (int i = 0; i < 2; ++i) {
                    if (this._visited.add(cidx += i)) continue;
                    Value v = Value.STORE_get(in.vec().chunkKey(cidx));
                    if (v == null || !v.isPersisted()) {
                        return;
                    }
                    v.freePOJO();
                    v.freeMem();
                }
            }

            @Override
            public void reduce(DistributedParse dp) {
                this._dout.reduce(dp._dout);
            }

            @Override
            public void postGlobal() {
                super.postGlobal();
                this._outerMFPT._dout[((MultiFileParseTask)this._outerMFPT)._lo] = this._dout;
                if (this._dout.hasErrors()) {
                    ParseWriter.ParseErr[] errs = this._dout.removeErrors();
                    Arrays.sort(errs, new Comparator<ParseWriter.ParseErr>(){

                        @Override
                        public int compare(ParseWriter.ParseErr o1, ParseWriter.ParseErr o2) {
                            return (int)(o1._byteOffset - o2._byteOffset);
                        }
                    });
                    MultiFileParseTask.access$1102(this._outerMFPT, errs);
                }
                this._dout = null;
                Value val = DKV.get(this._srckey);
                if (val == null) {
                    return;
                }
                Object ice = val.get();
                if (ice instanceof ByteVec) {
                    if (this._outerMFPT._deleteOnDone) {
                        ((ByteVec)ice).remove();
                    }
                } else {
                    Frame fr = (Frame)ice;
                    if (this._outerMFPT._deleteOnDone) {
                        fr.delete(this._outerMFPT._jobKey, new Futures()).blockForPending();
                    } else if (fr._key != null) {
                        fr.unlock(this._outerMFPT._jobKey);
                    }
                }
            }
        }
    }

    private static class SVFTask
    extends MRTask<SVFTask> {
        private final Frame _f;

        private SVFTask(Frame f) {
            this._f = f;
        }

        @Override
        public void setupLocal() {
            if (this._f.numCols() == 0) {
                return;
            }
            Vec v0 = this._f.anyVec();
            ArrayList<1> rs = new ArrayList<1>();
            for (int i = 0; i < v0.nChunks(); ++i) {
                if (!v0.chunkKey(i).home()) continue;
                final int fi = i;
                rs.add(new RecursiveAction(){

                    @Override
                    protected void compute() {
                        Value val;
                        int nlines = 0;
                        for (Vec vec : SVFTask.this._f.vecs()) {
                            val = Value.STORE_get(vec.chunkKey(fi));
                            if (val == null) continue;
                            nlines = ((Chunk)val.get())._len;
                            break;
                        }
                        int fnlines = nlines;
                        for (int j = 0; j < SVFTask.this._f.numCols(); ++j) {
                            Vec vec = SVFTask.this._f.vec(j);
                            Key k = vec.chunkKey(fi);
                            val = Value.STORE_get(k);
                            if (val != null) continue;
                            H2O.putIfMatch(k, new Value(k, (Freezable)new C0LChunk(0L, fnlines)), null);
                        }
                    }
                });
            }
            ForkJoinTask.invokeAll(rs);
        }

        @Override
        public void reduce(SVFTask drt) {
        }
    }

    private static class GatherCategoricalDomainsTask
    extends MRTask<GatherCategoricalDomainsTask> {
        private final Key _k;
        private final int[] _catColIdxs;
        private byte[][] _packedDomains;

        private GatherCategoricalDomainsTask(Key k, int[] ccols) {
            this._k = k;
            this._catColIdxs = ccols;
        }

        @Override
        public void setupLocal() {
            if (!MultiFileParseTask._categoricals.containsKey(this._k)) {
                return;
            }
            this._packedDomains = new byte[this._catColIdxs.length][];
            BufferedString[][] _perColDomains = new BufferedString[this._catColIdxs.length][];
            Categorical[] _colCats = (Categorical[])MultiFileParseTask._categoricals.get(this._k);
            int i = 0;
            for (int col : this._catColIdxs) {
                _colCats[col].convertToUTF8(col + 1);
                _perColDomains[i] = _colCats[col].getColumnDomain();
                Arrays.sort(_perColDomains[i]);
                this._packedDomains[i] = this.packDomain(_perColDomains[i]);
                ++i;
            }
            Log.trace("Done locally collecting domains on each node.");
        }

        @Override
        public void reduce(GatherCategoricalDomainsTask other) {
            if (this._packedDomains == null) {
                this._packedDomains = other._packedDomains;
            } else if (other._packedDomains != null) {
                int i;
                H2O.H2OCountedCompleter[] domtasks = new H2O.H2OCountedCompleter[this._catColIdxs.length];
                for (i = 0; i < this._catColIdxs.length; ++i) {
                    final int fi = i;
                    final GatherCategoricalDomainsTask fOther = other;
                    domtasks[i] = new H2O.H2OCountedCompleter(){

                        @Override
                        public void compute2() {
                            int remainder;
                            byte[] thisDom = GatherCategoricalDomainsTask.this._packedDomains[fi];
                            byte[] otherDom = fOther._packedDomains[fi];
                            int tLen = UnsafeUtils.get4(thisDom, 0);
                            int oLen = UnsafeUtils.get4(otherDom, 0);
                            int tDomLen = UnsafeUtils.get4(thisDom, 4);
                            int oDomLen = UnsafeUtils.get4(otherDom, 4);
                            BufferedString tCat = new BufferedString(thisDom, 8, tDomLen);
                            BufferedString oCat = new BufferedString(otherDom, 8, oDomLen);
                            int ti = 0;
                            int oi = 0;
                            int tbi = 8;
                            int obi = 8;
                            int mbi = 4;
                            int mergeLen = 0;
                            byte[] mergedDom = new byte[thisDom.length + otherDom.length];
                            while (ti < tLen && oi < oLen) {
                                int j;
                                int x = tCat.compareTo(oCat);
                                if (x <= 0) {
                                    UnsafeUtils.set4(mergedDom, mbi, tDomLen);
                                    mbi += 4;
                                    for (j = 0; j < tDomLen; ++j) {
                                        mergedDom[mbi++] = thisDom[tbi++];
                                    }
                                    tDomLen = UnsafeUtils.get4(thisDom, tbi);
                                    tCat.set(thisDom, tbi += 4, tDomLen);
                                    ++ti;
                                    if (x == 0) {
                                        oDomLen = UnsafeUtils.get4(otherDom, obi += oDomLen);
                                        oCat.set(otherDom, obi += 4, oDomLen);
                                        ++oi;
                                    }
                                } else {
                                    UnsafeUtils.set4(mergedDom, mbi, oDomLen);
                                    mbi += 4;
                                    for (j = 0; j < oDomLen; ++j) {
                                        mergedDom[mbi++] = otherDom[obi++];
                                    }
                                    oDomLen = UnsafeUtils.get4(otherDom, obi);
                                    oCat.set(otherDom, obi += 4, oDomLen);
                                    ++oi;
                                }
                                ++mergeLen;
                            }
                            if (ti < tLen) {
                                remainder = thisDom.length - (tbi -= 4);
                                System.arraycopy(thisDom, tbi, mergedDom, mbi, remainder);
                                mbi += remainder;
                                mergeLen += tLen - ti;
                            } else {
                                remainder = otherDom.length - (obi -= 4);
                                System.arraycopy(otherDom, obi, mergedDom, mbi, remainder);
                                mbi += remainder;
                                mergeLen += oLen - oi;
                            }
                            ((GatherCategoricalDomainsTask)GatherCategoricalDomainsTask.this)._packedDomains[fi] = Arrays.copyOf(mergedDom, mbi);
                            UnsafeUtils.set4(GatherCategoricalDomainsTask.this._packedDomains[fi], 0, mergeLen);
                            Log.trace("Merged domain length is " + mergeLen + " for the " + PrettyPrint.withOrdinalIndicator(fi + 1) + " categorical column.");
                            this.tryComplete();
                        }
                    };
                    H2O.submitTask(domtasks[i]);
                }
                for (i = 0; i < this._catColIdxs.length; ++i) {
                    if (domtasks[i] == null) continue;
                    domtasks[i].join();
                }
            }
            Log.trace("Done merging domains.");
        }

        private byte[] packDomain(BufferedString[] domain) {
            int totStrLen = 0;
            for (BufferedString dom : domain) {
                totStrLen += dom.length();
            }
            byte[] packedDom = MemoryManager.malloc1(4 + (domain.length << 2) + totStrLen, false);
            UnsafeUtils.set4(packedDom, 0, domain.length);
            int i = 4;
            for (BufferedString dom : domain) {
                UnsafeUtils.set4(packedDom, i, dom.length());
                i += 4;
                byte[] buf = dom.getBuffer();
                for (int j = 0; j < buf.length; ++j) {
                    packedDom[i++] = buf[j];
                }
            }
            return packedDom;
        }

        public int getDomainLength(int colIdx) {
            if (this._packedDomains == null) {
                return 0;
            }
            return UnsafeUtils.get4(this._packedDomains[colIdx], 0);
        }

        public String[] getDomain(int colIdx) {
            if (this._packedDomains == null) {
                return null;
            }
            int strCnt = UnsafeUtils.get4(this._packedDomains[colIdx], 0);
            String[] res = new String[strCnt];
            int j = 4;
            for (int i = 0; i < strCnt; ++i) {
                int strLen = UnsafeUtils.get4(this._packedDomains[colIdx], j);
                res[i] = new String(this._packedDomains[colIdx], j += 4, strLen, Charsets.UTF_8);
                j += strLen;
            }
            return res;
        }
    }

    private static class UpdateCategoricalChunksTask
    extends MRTask<UpdateCategoricalChunksTask> {
        private final Key _parseCatMapsKey;
        private final int[] _chunk2ParseNodeMap;

        private UpdateCategoricalChunksTask(Key parseCatMapsKey, int[] chunk2ParseNodeMap) {
            this._parseCatMapsKey = parseCatMapsKey;
            this._chunk2ParseNodeMap = chunk2ParseNodeMap;
        }

        @Override
        public void map(Chunk[] chks) {
            CategoricalUpdateMap temp = (CategoricalUpdateMap)DKV.getGet(Key.make(this._parseCatMapsKey.toString() + "parseCatMapNode" + this._chunk2ParseNodeMap[chks[0].cidx()]));
            if (temp == null || temp.map == null) {
                throw new H2OIllegalValueException("Missing categorical update map", this);
            }
            int[][] _parse2GlobalCatMaps = temp.map;
            int cidx = chks[0].cidx();
            for (int i = 0; i < chks.length; ++i) {
                Chunk chk = chks[i];
                if (!(chk instanceof CStrChunk)) {
                    for (int j = 0; j < chk._len; ++j) {
                        if (chk.isNA(j)) continue;
                        int old = (int)chk.at8(j);
                        if (old < 0 || _parse2GlobalCatMaps[i] != null && old >= _parse2GlobalCatMaps[i].length) {
                            chk.reportBrokenCategorical(i, j, old, _parse2GlobalCatMaps[i], this._fr.vec(i).domain().length);
                        }
                        if (_parse2GlobalCatMaps[i] != null && _parse2GlobalCatMaps[i][old] < 0) {
                            throw new H2OParseException("Error in unifying categorical values. This is typically caused by unrecognized characters in the data.\n The problem categorical value occurred in the " + PrettyPrint.withOrdinalIndicator(i + 1) + " categorical col, " + PrettyPrint.withOrdinalIndicator(chk.start() + (long)j) + " row.");
                        }
                        if (_parse2GlobalCatMaps[i] == null) continue;
                        chk.set(j, _parse2GlobalCatMaps[i][old]);
                    }
                    Log.trace("Updated domains for " + PrettyPrint.withOrdinalIndicator(i + 1) + " categorical column.");
                }
                chk.close(cidx, this._fs);
            }
        }

        @Override
        public void postGlobal() {
            for (int i = 0; i < H2O.CLOUD.size(); ++i) {
                DKV.remove(Key.make(this._parseCatMapsKey.toString() + "parseCatMapNode" + i));
            }
        }
    }

    private static class CreateParse2GlobalCategoricalMaps
    extends DTask<CreateParse2GlobalCategoricalMaps> {
        private final Key _parseCatMapsKey;
        private final Key _frKey;
        private final int[] _ecol;

        private CreateParse2GlobalCategoricalMaps(Key parseCatMapsKey, Key key, int[] ecol) {
            this._parseCatMapsKey = parseCatMapsKey;
            this._frKey = key;
            this._ecol = ecol;
        }

        @Override
        public void compute2() {
            Frame _fr = (Frame)DKV.getGet(this._frKey);
            if (!MultiFileParseTask._categoricals.containsKey(this._parseCatMapsKey)) {
                this.tryComplete();
                return;
            }
            Categorical[] parseCatMaps = (Categorical[])MultiFileParseTask._categoricals.get(this._parseCatMapsKey);
            int[][] _nodeOrdMaps = new int[this._ecol.length][];
            for (int eColIdx = 0; eColIdx < this._ecol.length; ++eColIdx) {
                int colIdx = this._ecol[eColIdx];
                if (parseCatMaps[colIdx].size() != 0) {
                    _nodeOrdMaps[eColIdx] = MemoryManager.malloc4(parseCatMaps[colIdx].maxId() + 1);
                    Arrays.fill(_nodeOrdMaps[eColIdx], -1);
                    BufferedString[] unifiedDomain = BufferedString.toBufferedString(_fr.vec(colIdx).domain());
                    for (int i = 0; i < unifiedDomain.length; ++i) {
                        if (!parseCatMaps[colIdx].containsKey(unifiedDomain[i])) continue;
                        _nodeOrdMaps[eColIdx][parseCatMaps[colIdx].getTokenId((BufferedString)unifiedDomain[i])] = i;
                    }
                    continue;
                }
                Log.debug("Column " + colIdx + " was marked as categorical but categorical map is empty!");
            }
            DKV.put(Key.make(this._parseCatMapsKey.toString() + "parseCatMapNode" + H2O.SELF.index()), new CategoricalUpdateMap(_nodeOrdMaps));
            this.tryComplete();
        }
    }

    private static class CategoricalUpdateMap
    extends Iced {
        final int[][] map;

        public CategoricalUpdateMap(int[][] map) {
            this.map = map;
        }
    }

    public static class ParserFJTask
    extends H2O.H2OCountedCompleter {
        final ParseDataset _pds;
        final Key[] _keys;
        final ParseSetup _setup;
        final boolean _deleteOnDone;

        public ParserFJTask(ParseDataset pds, Key[] keys, ParseSetup setup, boolean deleteOnDone) {
            this._pds = pds;
            this._keys = keys;
            this._setup = setup;
            this._deleteOnDone = deleteOnDone;
        }

        @Override
        public void compute2() {
            ParseDataset.parseAllKeys(this._pds, this._keys, this._setup, this._deleteOnDone);
            this.tryComplete();
        }

        @Override
        public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) {
            this.parseCleanup();
            return true;
        }

        @Override
        public void onCompletion(CountedCompleter caller) {
            if (this._pds._job.stop_requested()) {
                this.parseCleanup();
            }
            this._pds._mfpt = null;
        }

        private void parseCleanup() {
            assert (!this._pds._job.isStopped());
            Futures fs = new Futures();
            MultiFileParseTask mfpt = this._pds._mfpt;
            this._pds._mfpt = null;
            if (mfpt != null) {
                mfpt.onExceptionCleanup(fs);
            }
            for (Key k : this._keys) {
                Keyed.remove(k, fs);
            }
            Keyed.remove(this._pds._job._result, fs);
            fs.blockForPending();
        }
    }
}

