/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.mapreduce;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Public
public class Import
extends Configured
implements Tool {
    private static final Log LOG = LogFactory.getLog(Import.class);
    static final String NAME = "import";
    public static final String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
    public static final String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
    public static final String FILTER_CLASS_CONF_KEY = "import.filter.class";
    public static final String FILTER_ARGS_CONF_KEY = "import.filter.args";
    public static final String TABLE_NAME = "import.table.name";
    public static final String WAL_DURABILITY = "import.wal.durability";
    public static final String HAS_LARGE_RESULT = "import.bulk.hasLargeResult";
    private static final String JOB_NAME_CONF_KEY = "mapreduce.job.name";

    public static Filter instantiateFilter(Configuration conf) {
        Class<Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
        if (filterClass == null) {
            LOG.debug((Object)"No configured filter class, accepting all keyvalues.");
            return null;
        }
        LOG.debug((Object)("Attempting to create filter:" + filterClass));
        String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
        ArrayList<byte[]> quotedArgs = Import.toQuotedByteArrays(filterArgs);
        try {
            Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
            return (Filter)m.invoke(null, quotedArgs);
        }
        catch (IllegalAccessException e) {
            LOG.error((Object)"Couldn't instantiate filter!", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (SecurityException e) {
            LOG.error((Object)"Couldn't instantiate filter!", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (NoSuchMethodException e) {
            LOG.error((Object)"Couldn't instantiate filter!", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (IllegalArgumentException e) {
            LOG.error((Object)"Couldn't instantiate filter!", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (InvocationTargetException e) {
            LOG.error((Object)"Couldn't instantiate filter!", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private static ArrayList<byte[]> toQuotedByteArrays(String ... stringArgs) {
        ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
        for (String stringArg : stringArgs) {
            quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
        }
        return quotedArgs;
    }

    public static Cell filterKv(Filter filter, Cell kv) throws IOException {
        if (filter != null) {
            Filter.ReturnCode code = filter.filterKeyValue(kv);
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Filter returned:" + (Object)((Object)code) + " for the key value:" + kv));
            }
            if (!code.equals((Object)Filter.ReturnCode.INCLUDE) && !code.equals((Object)Filter.ReturnCode.INCLUDE_AND_NEXT_COL)) {
                return null;
            }
        }
        return kv;
    }

    private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
        byte[] newCfName;
        if (cfRenameMap != null && (newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv))) != null) {
            kv = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), newCfName, 0, newCfName.length, kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()), kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
        }
        return kv;
    }

    private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
        TreeMap<byte[], byte[]> cfRenameMap = null;
        String allMappingsPropVal = conf.get(CF_RENAME_PROP);
        if (allMappingsPropVal != null) {
            String[] allMappings;
            for (String mapping : allMappings = allMappingsPropVal.split(",")) {
                String[] srcAndDest;
                if (cfRenameMap == null) {
                    cfRenameMap = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
                }
                if ((srcAndDest = mapping.split(":")).length != 2) continue;
                cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
            }
        }
        return cfRenameMap;
    }

    public static void configureCfRenaming(Configuration conf, Map<String, String> renameMap) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : renameMap.entrySet()) {
            String sourceCf = entry.getKey();
            String destCf = entry.getValue();
            if (sourceCf.contains(":") || sourceCf.contains(",") || destCf.contains(":") || destCf.contains(",")) {
                throw new IllegalArgumentException("Illegal character in CF names: " + sourceCf + ", " + destCf);
            }
            if (sb.length() != 0) {
                sb.append(",");
            }
            sb.append(sourceCf + ":" + destCf);
        }
        conf.set(CF_RENAME_PROP, sb.toString());
    }

    public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz, List<String> filterArgs) throws IOException {
        conf.set(FILTER_CLASS_CONF_KEY, clazz.getName());
        conf.setStrings(FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
    }

    public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
        Job job;
        block77: {
            Connection conn;
            TableName tableName = TableName.valueOf(args[0]);
            conf.set(TABLE_NAME, tableName.getNameAsString());
            Path inputDir = new Path(args[1]);
            job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, "import_" + tableName));
            job.setJarByClass(Importer.class);
            FileInputFormat.setInputPaths(job, inputDir);
            job.setInputFormatClass(SequenceFileInputFormat.class);
            String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
            try {
                Class<Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
                if (filter != null) {
                    TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
                }
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
                LOG.info((Object)"Use Large Result!!");
                conn = ConnectionFactory.createConnection(conf);
                Throwable throwable = null;
                try (Table table2 = conn.getTable(tableName);
                     RegionLocator regionLocator = conn.getRegionLocator(tableName);){
                    HFileOutputFormat2.configureIncrementalLoad(job, table2.getDescriptor(), regionLocator);
                    job.setMapperClass(KeyValueSortImporter.class);
                    job.setReducerClass(KeyValueReducer.class);
                    Path outputDir = new Path(hfileOutPath);
                    FileOutputFormat.setOutputPath(job, outputDir);
                    job.setMapOutputKeyClass(KeyValueWritableComparable.class);
                    job.setMapOutputValueClass(KeyValue.class);
                    job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", KeyValueWritableComparable.KeyValueWritableComparator.class, RawComparator.class);
                    Path partitionsPath = new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
                    FileSystem fs = FileSystem.get(job.getConfiguration());
                    fs.deleteOnExit(partitionsPath);
                    job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
                    job.setNumReduceTasks(regionLocator.getStartKeys().length);
                    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Preconditions.class);
                    break block77;
                }
                catch (Throwable table2) {
                    throwable = table2;
                    throw table2;
                }
                finally {
                    if (conn != null) {
                        if (throwable != null) {
                            try {
                                conn.close();
                            }
                            catch (Throwable table2) {
                                throwable.addSuppressed(table2);
                            }
                        } else {
                            conn.close();
                        }
                    }
                }
            }
            if (hfileOutPath != null) {
                LOG.info((Object)"writing to hfiles for bulk load.");
                job.setMapperClass(KeyValueImporter.class);
                conn = ConnectionFactory.createConnection(conf);
                Throwable throwable = null;
                try (Table table = conn.getTable(tableName);
                     RegionLocator regionLocator = conn.getRegionLocator(tableName);){
                    job.setReducerClass(KeyValueSortReducer.class);
                    Path outputDir = new Path(hfileOutPath);
                    FileOutputFormat.setOutputPath(job, outputDir);
                    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                    job.setMapOutputValueClass(KeyValue.class);
                    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
                    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Preconditions.class);
                    break block77;
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (conn != null) {
                        if (throwable != null) {
                            try {
                                conn.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                        } else {
                            conn.close();
                        }
                    }
                }
            }
            LOG.info((Object)"writing directly to table from Mapper.");
            job.setMapperClass(Importer.class);
            TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
            job.setNumReduceTasks(0);
        }
        return job;
    }

    private static void usage(String errorMsg) {
        if (errorMsg != null && errorMsg.length() > 0) {
            System.err.println("ERROR: " + errorMsg);
        }
        System.err.println("Usage: Import [options] <tablename> <inputdir>");
        System.err.println("By default Import will load data directly into HBase. To instead generate");
        System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
        System.err.println("  -Dimport.bulk.output=/path/for/output");
        System.err.println("If there is a large result that includes too much KeyValue whitch can occur OOME caused by the memery sort in reducer, pass the option:");
        System.err.println("  -Dimport.bulk.hasLargeResult=true");
        System.err.println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
        System.err.println("  -Dimport.filter.class=<name of filter class>");
        System.err.println("  -Dimport.filter.args=<comma separated list of args for filter");
        System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the HBASE_IMPORTER_RENAME_CFS property. Futher, filters will only use the Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify  whether the current row needs to be ignored completely for processing and  Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added; Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including the KeyValue.");
        System.err.println("To import data exported from HBase 0.94, use");
        System.err.println("  -Dhbase.import.version=0.94");
        System.err.println("  -D mapreduce.job.name=jobName - use the specified mapreduce job name for the import");
        System.err.println("For performance consider the following options:\n  -Dmapreduce.map.speculative=false\n  -Dmapreduce.reduce.speculative=false\n  -Dimport.wal.durability=<Used while writing data to hbase. Allowed values are the supported durability values like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void flushRegionsIfNecessary(Configuration conf) throws IOException, InterruptedException {
        String tableName = conf.get(TABLE_NAME);
        Admin hAdmin = null;
        Connection connection = null;
        String durability = conf.get(WAL_DURABILITY);
        if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
            LOG.info((Object)"Flushing all data that skipped the WAL.");
            try {
                connection = ConnectionFactory.createConnection(conf);
                hAdmin = connection.getAdmin();
                hAdmin.flush(TableName.valueOf(tableName));
            }
            finally {
                if (hAdmin != null) {
                    hAdmin.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job;
        boolean isJobSuccessful;
        if (args.length < 2) {
            Import.usage("Wrong number of arguments: " + args.length);
            return -1;
        }
        String inputVersionString = System.getProperty("hbase.import.version");
        if (inputVersionString != null) {
            this.getConf().set("hbase.import.version", inputVersionString);
        }
        if (isJobSuccessful = (job = Import.createSubmittableJob(this.getConf(), args)).waitForCompletion(true)) {
            Import.flushRegionsIfNecessary(this.getConf());
        }
        long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
        long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
        if (outputRecords < inputRecords) {
            System.err.println("Warning, not all records were imported (maybe filtered out).");
            if (outputRecords == 0L) {
                System.err.println("If the data was exported from HBase 0.94 consider using -Dhbase.import.version=0.94.");
            }
        }
        return isJobSuccessful ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
        System.exit(errCode);
    }

    public static class Importer
    extends TableMapper<ImmutableBytesWritable, Mutation> {
        private Map<byte[], byte[]> cfRenameMap;
        private List<UUID> clusterIds;
        private Filter filter;
        private Durability durability;

        @Override
        public void map(ImmutableBytesWritable row, Result value, Mapper.Context context) throws IOException {
            try {
                this.writeResult(row, value, context);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void writeResult(ImmutableBytesWritable key, Result result, Mapper.Context context) throws IOException, InterruptedException {
            Put put = null;
            Delete delete = null;
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Considering the row." + Bytes.toString(key.get(), key.getOffset(), key.getLength())));
            }
            if (this.filter == null || !this.filter.filterRowKey(CellUtil.createFirstOnRow(key.get(), key.getOffset(), (short)key.getLength()))) {
                this.processKV(key, result, context, put, delete);
            }
        }

        protected void processKV(ImmutableBytesWritable key, Result result, Mapper.Context context, Put put, Delete delete) throws IOException, InterruptedException {
            for (Cell kv : result.rawCells()) {
                if ((kv = Import.filterKv(this.filter, kv)) == null) continue;
                if (CellUtil.isDeleteFamily(kv = Import.convertKv(kv, this.cfRenameMap))) {
                    Delete deleteFamily = new Delete(key.get());
                    deleteFamily.add(kv);
                    if (this.durability != null) {
                        deleteFamily.setDurability(this.durability);
                    }
                    deleteFamily.setClusterIds((List)this.clusterIds);
                    context.write(key, deleteFamily);
                    continue;
                }
                if (CellUtil.isDelete(kv)) {
                    if (delete == null) {
                        delete = new Delete(key.get());
                    }
                    delete.add(kv);
                    continue;
                }
                if (put == null) {
                    put = new Put(key.get());
                }
                this.addPutToKv(put, kv);
            }
            if (put != null) {
                if (this.durability != null) {
                    put.setDurability(this.durability);
                }
                put.setClusterIds((List)this.clusterIds);
                context.write(key, put);
            }
            if (delete != null) {
                if (this.durability != null) {
                    delete.setDurability(this.durability);
                }
                delete.setClusterIds((List)this.clusterIds);
                context.write(key, delete);
            }
        }

        protected void addPutToKv(Put put, Cell kv) throws IOException {
            put.add(kv);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void setup(Mapper.Context context) {
            LOG.info((Object)("Setting up " + this.getClass() + " mapper."));
            Configuration conf = context.getConfiguration();
            this.cfRenameMap = Import.createCfRenameMap(conf);
            this.filter = Import.instantiateFilter(conf);
            String durabilityStr = conf.get(Import.WAL_DURABILITY);
            if (durabilityStr != null) {
                this.durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
                LOG.info((Object)("setting WAL durability to " + (Object)((Object)this.durability)));
            } else {
                LOG.info((Object)"setting WAL durability to default.");
            }
            Exception ex = null;
            try (ZooKeeperWatcher zkw = null;){
                zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
                this.clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
            }
            if (this.clusterIds == null) {
                throw new RuntimeException(ex);
            }
        }
    }

    @SuppressWarnings(value={"EQ_COMPARETO_USE_OBJECT_EQUALS"}, justification="Writables are going away and this has been this way forever")
    public static class KeyValueImporter
    extends TableMapper<ImmutableBytesWritable, KeyValue> {
        private Map<byte[], byte[]> cfRenameMap;
        private Filter filter;
        private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);

        @Override
        public void map(ImmutableBytesWritable row, Result value, Mapper.Context context) throws IOException {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())));
                }
                if (this.filter == null || !this.filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(), (short)row.getLength()))) {
                    for (Cell kv : value.rawCells()) {
                        if ((kv = Import.filterKv(this.filter, kv)) == null) continue;
                        context.write(row, KeyValueUtil.ensureKeyValue(Import.convertKv(kv, this.cfRenameMap)));
                    }
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void setup(Mapper.Context context) {
            this.cfRenameMap = Import.createCfRenameMap(context.getConfiguration());
            this.filter = Import.instantiateFilter(context.getConfiguration());
        }
    }

    public static class KeyValueSortImporter
    extends TableMapper<KeyValueWritableComparable, KeyValue> {
        private Map<byte[], byte[]> cfRenameMap;
        private Filter filter;
        private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);

        @Override
        public void map(ImmutableBytesWritable row, Result value, Mapper.Context context) throws IOException {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())));
                }
                if (this.filter == null || !this.filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(), (short)row.getLength()))) {
                    for (Cell kv : value.rawCells()) {
                        if ((kv = Import.filterKv(this.filter, kv)) == null) continue;
                        KeyValue ret = KeyValueUtil.ensureKeyValue(Import.convertKv(kv, this.cfRenameMap));
                        context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
                    }
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void setup(Mapper.Context context) throws IOException {
            this.cfRenameMap = Import.createCfRenameMap(context.getConfiguration());
            this.filter = Import.instantiateFilter(context.getConfiguration());
            int reduceNum = context.getNumReduceTasks();
            Configuration conf = context.getConfiguration();
            TableName tableName = TableName.valueOf(context.getConfiguration().get(Import.TABLE_NAME));
            try (Connection conn = ConnectionFactory.createConnection(conf);
                 RegionLocator regionLocator = conn.getRegionLocator(tableName);){
                byte[][] startKeys = regionLocator.getStartKeys();
                if (startKeys.length != reduceNum) {
                    throw new IOException("Region split after job initialization");
                }
                KeyValueWritableComparable[] startKeyWraps = new KeyValueWritableComparable[startKeys.length - 1];
                for (int i = 1; i < startKeys.length; ++i) {
                    startKeyWraps[i - 1] = new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
                }
                KeyValueWritableComparablePartitioner.access$202(startKeyWraps);
            }
        }
    }

    public static class KeyValueReducer
    extends Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
        @Override
        protected void reduce(KeyValueWritableComparable row, Iterable<KeyValue> kvs, Reducer.Context context) throws IOException, InterruptedException {
            int index = 0;
            for (KeyValue kv : kvs) {
                context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
                if (++index % 100 != 0) continue;
                context.setStatus("Wrote " + index + " KeyValues, and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
            }
        }
    }

    public static class KeyValueWritableComparable
    implements WritableComparable<KeyValueWritableComparable> {
        private KeyValue kv = null;

        public KeyValueWritableComparable() {
        }

        public KeyValueWritableComparable(KeyValue kv) {
            this.kv = kv;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            KeyValue.write(this.kv, out);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.kv = KeyValue.create(in);
        }

        @Override
        @SuppressWarnings(value={"EQ_COMPARETO_USE_OBJECT_EQUALS"}, justification="This is wrong, yes, but we should be purging Writables, not fixing them")
        public int compareTo(KeyValueWritableComparable o) {
            return CellComparator.COMPARATOR.compare(this.kv, o.kv);
        }

        static {
            WritableComparator.define(KeyValueWritableComparable.class, new KeyValueWritableComparator());
        }

        public static class KeyValueWritableComparator
        extends WritableComparator {
            @Override
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                try {
                    KeyValueWritableComparable kv1 = new KeyValueWritableComparable();
                    kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
                    KeyValueWritableComparable kv2 = new KeyValueWritableComparable();
                    kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
                    return this.compare(kv1, kv2);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public static class KeyValueWritableComparablePartitioner
    extends Partitioner<KeyValueWritableComparable, KeyValue> {
        private static KeyValueWritableComparable[] START_KEYS = null;

        @Override
        public int getPartition(KeyValueWritableComparable key, KeyValue value, int numPartitions) {
            for (int i = 0; i < START_KEYS.length; ++i) {
                if (key.compareTo(START_KEYS[i]) > 0) continue;
                return i;
            }
            return START_KEYS.length;
        }

        static /* synthetic */ KeyValueWritableComparable[] access$202(KeyValueWritableComparable[] x0) {
            START_KEYS = x0;
            return x0;
        }
    }
}

