/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.mapreduce.replication;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class VerifyReplication
extends Configured
implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(VerifyReplication.class);
    public static final String NAME = "verifyrep";
    private static final String PEER_CONFIG_PREFIX = "verifyrep.peer.";
    private static ThreadPoolExecutor reCompareExecutor = null;
    int reCompareTries = 0;
    int reCompareBackoffExponent = 0;
    int reCompareThreads = 0;
    int sleepMsBeforeReCompare = 0;
    long startTime = 0L;
    long endTime = Long.MAX_VALUE;
    int batch = -1;
    int versions = -1;
    String tableName = null;
    String families = null;
    String delimiter = "";
    String peerId = null;
    String peerQuorumAddress = null;
    String rowPrefixes = null;
    boolean verbose = false;
    boolean includeDeletedCells = false;
    String sourceSnapshotName = null;
    String sourceSnapshotTmpDir = null;
    String peerSnapshotName = null;
    String peerSnapshotTmpDir = null;
    String peerFSAddress = null;
    String peerHBaseRootAddress = null;
    String peerTableName = null;
    private static final String JOB_NAME_CONF_KEY = "mapreduce.job.name";

    private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(Configuration conf, String peerId) throws IOException {
        try (ZKWatcher localZKW = null;){
            localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable(){

                @Override
                public void abort(String why, Throwable e) {
                }

                @Override
                public boolean isAborted() {
                    return false;
                }
            });
            ReplicationPeerStorage storage = ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get((Configuration)conf), localZKW, conf);
            ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
            Pair<ReplicationPeerConfig, Configuration> pair = Pair.newPair(peerConfig, ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
            return pair;
        }
    }

    private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress) throws IOException {
        Configuration peerConf = HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
        FileSystem.setDefaultUri((Configuration)peerConf, (String)this.peerFSAddress);
        CommonFSUtils.setRootDir(peerConf, new Path(this.peerFSAddress, this.peerHBaseRootAddress));
        FileSystem fs = FileSystem.get((Configuration)peerConf);
        RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf), new Path(this.peerFSAddress, this.peerSnapshotTmpDir), this.peerSnapshotName);
    }

    public Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
        Configuration peerClusterConf;
        String peerQuorumAddress;
        if (!this.doCommandLine(args)) {
            return null;
        }
        conf.set("verifyrep.tableName", this.tableName);
        conf.setLong("verifyrep.startTime", this.startTime);
        conf.setLong("verifyrep.endTime", this.endTime);
        conf.setInt("verifyrep.sleepMsBeforeReCompare", this.sleepMsBeforeReCompare);
        conf.set("verifyrep.delimiter", this.delimiter);
        conf.setInt("verifyrep.batch", this.batch);
        conf.setBoolean("verifyrep.verbose", this.verbose);
        conf.setBoolean("verifyrep.includeDeletedCells", this.includeDeletedCells);
        if (this.families != null) {
            conf.set("verifyrep.families", this.families);
        }
        if (this.rowPrefixes != null) {
            conf.set("verifyrep.rowPrefixes", this.rowPrefixes);
        }
        Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
        if (this.peerId != null) {
            peerConfigPair = VerifyReplication.getPeerQuorumConfig(conf, this.peerId);
            ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
            peerQuorumAddress = peerConfig.getClusterKey();
            LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + peerConfig.getConfiguration());
            conf.set("verifyrep.peerQuorumAddress", peerQuorumAddress);
            HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, peerConfig.getConfiguration().entrySet());
        } else {
            assert (this.peerQuorumAddress != null);
            peerQuorumAddress = this.peerQuorumAddress;
            LOG.info("Peer Quorum Address: " + peerQuorumAddress);
            conf.set("verifyrep.peerQuorumAddress", peerQuorumAddress);
        }
        if (this.peerTableName != null) {
            LOG.info("Peer Table Name: " + this.peerTableName);
            conf.set("verifyrep.peerTableName", this.peerTableName);
        }
        conf.setInt("verifyrep.versions", this.versions);
        LOG.info("Number of version: " + this.versions);
        conf.setInt("verifyrep.recompareTries", this.reCompareTries);
        conf.setInt("verifyrep.recompareBackoffExponent", this.reCompareBackoffExponent);
        conf.setInt("verifyrep.recompareThreads", this.reCompareThreads);
        if (this.peerSnapshotName != null) {
            conf.set("verifyrep.peerSnapshotName", this.peerSnapshotName);
            Path restoreDir = new Path(this.peerSnapshotTmpDir, UUID.randomUUID().toString());
            this.peerSnapshotTmpDir = restoreDir.toString();
            conf.set("verifyrep.peerSnapshotTmpDir", this.peerSnapshotTmpDir);
            conf.set("verifyrep.peerFSAddress", this.peerFSAddress);
            conf.set("verifyrep.peerHBaseRootAddress", this.peerHBaseRootAddress);
            conf.setStrings("mapreduce.job.hdfs-servers", new String[]{this.peerFSAddress, conf.get("hbase.rootdir")});
        }
        Job job = Job.getInstance((Configuration)conf, (String)conf.get(JOB_NAME_CONF_KEY, "verifyrep_" + this.tableName));
        job.setJarByClass(VerifyReplication.class);
        Scan scan = new Scan();
        scan.setTimeRange(this.startTime, this.endTime);
        scan.setRaw(this.includeDeletedCells);
        scan.setCacheBlocks(false);
        if (this.batch > 0) {
            scan.setBatch(this.batch);
        }
        if (this.versions >= 0) {
            scan.readVersions(this.versions);
            LOG.info("Number of versions set to " + this.versions);
        }
        if (this.families != null) {
            String[] fams;
            for (String fam : fams = this.families.split(",")) {
                scan.addFamily(Bytes.toBytes(fam));
            }
        }
        VerifyReplication.setRowPrefixFilter(scan, this.rowPrefixes);
        if (this.sourceSnapshotName != null) {
            Path snapshotTempPath = new Path(this.sourceSnapshotTmpDir);
            LOG.info("Using source snapshot-" + this.sourceSnapshotName + " with temp dir:" + this.sourceSnapshotTmpDir);
            TableMapReduceUtil.initTableSnapshotMapperJob(this.sourceSnapshotName, scan, Verifier.class, null, null, job, true, snapshotTempPath);
            this.restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
        } else {
            TableMapReduceUtil.initTableMapperJob(this.tableName, scan, Verifier.class, null, null, job);
        }
        if (this.peerId != null) {
            assert (peerConfigPair != null);
            peerClusterConf = peerConfigPair.getSecond();
        } else {
            peerClusterConf = HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
        }
        TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setNumReduceTasks(0);
        return job;
    }

    protected static byte[] getRow(Result sourceResult, Result replicatedResult) {
        if (sourceResult != null) {
            return sourceResult.getRow();
        }
        if (replicatedResult != null) {
            return replicatedResult.getRow();
        }
        throw new RuntimeException("Both sourceResult and replicatedResult are null!");
    }

    private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
        if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
            Object[] rowPrefixArray = rowPrefixes.split(",");
            Arrays.sort(rowPrefixArray);
            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
            for (Object prefix : rowPrefixArray) {
                PrefixFilter filter = new PrefixFilter(Bytes.toBytes((String)prefix));
                filterList.addFilter(filter);
            }
            scan.setFilter(filterList);
            byte[] startPrefixRow = Bytes.toBytes((String)rowPrefixArray[0]);
            byte[] lastPrefixRow = Bytes.toBytes((String)rowPrefixArray[rowPrefixArray.length - 1]);
            VerifyReplication.setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
        }
    }

    private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
        scan.withStartRow(startPrefixRow);
        byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), new byte[]{(byte)(lastPrefixRow[lastPrefixRow.length - 1] + 1)});
        scan.withStopRow(stopRow);
    }

    public boolean doCommandLine(String[] args) {
        if (args.length < 2) {
            VerifyReplication.printUsage(null);
            return false;
        }
        try {
            for (int i = 0; i < args.length; ++i) {
                String cmd = args[i];
                if (cmd.equals("-h") || cmd.startsWith("--h")) {
                    VerifyReplication.printUsage(null);
                    return false;
                }
                String startTimeArgKey = "--starttime=";
                if (cmd.startsWith("--starttime=")) {
                    this.startTime = Long.parseLong(cmd.substring("--starttime=".length()));
                    continue;
                }
                String endTimeArgKey = "--endtime=";
                if (cmd.startsWith("--endtime=")) {
                    this.endTime = Long.parseLong(cmd.substring("--endtime=".length()));
                    continue;
                }
                String includeDeletedCellsArgKey = "--raw";
                if (cmd.equals("--raw")) {
                    this.includeDeletedCells = true;
                    continue;
                }
                String versionsArgKey = "--versions=";
                if (cmd.startsWith("--versions=")) {
                    this.versions = Integer.parseInt(cmd.substring("--versions=".length()));
                    continue;
                }
                String batchArgKey = "--batch=";
                if (cmd.startsWith("--batch=")) {
                    this.batch = Integer.parseInt(cmd.substring("--batch=".length()));
                    continue;
                }
                String familiesArgKey = "--families=";
                if (cmd.startsWith("--families=")) {
                    this.families = cmd.substring("--families=".length());
                    continue;
                }
                String rowPrefixesKey = "--row-prefixes=";
                if (cmd.startsWith("--row-prefixes=")) {
                    this.rowPrefixes = cmd.substring("--row-prefixes=".length());
                    continue;
                }
                String delimiterArgKey = "--delimiter=";
                if (cmd.startsWith("--delimiter=")) {
                    this.delimiter = cmd.substring("--delimiter=".length());
                    continue;
                }
                String deprecatedSleepToReCompareKey = "--recomparesleep=";
                String sleepToReCompareKey = "--recompareSleep=";
                if (cmd.startsWith("--recomparesleep=")) {
                    LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0. Use --recompareSleep instead.");
                    this.sleepMsBeforeReCompare = Integer.parseInt(cmd.substring("--recomparesleep=".length()));
                    continue;
                }
                if (cmd.startsWith("--recompareSleep=")) {
                    this.sleepMsBeforeReCompare = Integer.parseInt(cmd.substring("--recompareSleep=".length()));
                    continue;
                }
                String verboseKey = "--verbose";
                if (cmd.startsWith("--verbose")) {
                    this.verbose = true;
                    continue;
                }
                String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
                if (cmd.startsWith("--sourceSnapshotName=")) {
                    this.sourceSnapshotName = cmd.substring("--sourceSnapshotName=".length());
                    continue;
                }
                String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
                if (cmd.startsWith("--sourceSnapshotTmpDir=")) {
                    this.sourceSnapshotTmpDir = cmd.substring("--sourceSnapshotTmpDir=".length());
                    continue;
                }
                String peerSnapshotNameArgKey = "--peerSnapshotName=";
                if (cmd.startsWith("--peerSnapshotName=")) {
                    this.peerSnapshotName = cmd.substring("--peerSnapshotName=".length());
                    continue;
                }
                String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
                if (cmd.startsWith("--peerSnapshotTmpDir=")) {
                    this.peerSnapshotTmpDir = cmd.substring("--peerSnapshotTmpDir=".length());
                    continue;
                }
                String peerFSAddressArgKey = "--peerFSAddress=";
                if (cmd.startsWith("--peerFSAddress=")) {
                    this.peerFSAddress = cmd.substring("--peerFSAddress=".length());
                    continue;
                }
                String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
                if (cmd.startsWith("--peerHBaseRootAddress=")) {
                    this.peerHBaseRootAddress = cmd.substring("--peerHBaseRootAddress=".length());
                    continue;
                }
                String peerTableNameArgKey = "--peerTableName=";
                if (cmd.startsWith("--peerTableName=")) {
                    this.peerTableName = cmd.substring("--peerTableName=".length());
                    continue;
                }
                String reCompareThreadArgs = "--recompareThreads=";
                if (cmd.startsWith("--recompareThreads=")) {
                    this.reCompareThreads = Integer.parseInt(cmd.substring("--recompareThreads=".length()));
                    continue;
                }
                String reCompareTriesKey = "--recompareTries=";
                if (cmd.startsWith("--recompareTries=")) {
                    this.reCompareTries = Integer.parseInt(cmd.substring("--recompareTries=".length()));
                    continue;
                }
                String reCompareBackoffExponentKey = "--recompareBackoffExponent=";
                if (cmd.startsWith("--recompareBackoffExponent=")) {
                    this.reCompareBackoffExponent = Integer.parseInt(cmd.substring("--recompareBackoffExponent=".length()));
                    continue;
                }
                if (cmd.startsWith("--")) {
                    VerifyReplication.printUsage("Invalid argument '" + cmd + "'");
                    return false;
                }
                if (i == args.length - 2) {
                    if (this.isPeerQuorumAddress(cmd)) {
                        this.peerQuorumAddress = cmd;
                    } else {
                        this.peerId = cmd;
                    }
                }
                if (i != args.length - 1) continue;
                this.tableName = cmd;
            }
            if (this.sourceSnapshotName != null && this.sourceSnapshotTmpDir == null || this.sourceSnapshotName == null && this.sourceSnapshotTmpDir != null) {
                VerifyReplication.printUsage("Source snapshot name and snapshot temp location should be provided to use snapshots in source cluster");
                return false;
            }
            if (!(this.peerSnapshotName == null && this.peerSnapshotTmpDir == null && this.peerFSAddress == null && this.peerHBaseRootAddress == null || this.peerSnapshotName != null && this.peerSnapshotTmpDir != null && this.peerFSAddress != null && this.peerHBaseRootAddress != null)) {
                VerifyReplication.printUsage("Peer snapshot name, peer snapshot temp location, Peer HBase root address and  peer FSAddress should be provided to use snapshots in peer cluster");
                return false;
            }
            if ((this.sourceSnapshotName != null || this.peerSnapshotName != null) && this.sleepMsBeforeReCompare > 0) {
                VerifyReplication.printUsage("Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
                return false;
            }
        }
        catch (Exception e) {
            LOG.error("Failed to parse commandLine arguments", (Throwable)e);
            VerifyReplication.printUsage("Can't start because " + e.getMessage());
            return false;
        }
        return true;
    }

    private boolean isPeerQuorumAddress(String cmd) {
        try {
            ZKConfig.validateClusterKey(cmd);
        }
        catch (IOException e) {
            return false;
        }
        return true;
    }

    private static void printUsage(String errorMsg) {
        if (errorMsg != null && errorMsg.length() > 0) {
            System.err.println("ERROR: " + errorMsg);
        }
        System.err.println("Usage: verifyrep [--starttime=X] [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] [--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=][--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
        System.err.println();
        System.err.println("Options:");
        System.err.println(" starttime    beginning of the time range");
        System.err.println("              without endtime means from starttime to forever");
        System.err.println(" endtime      end of the time range");
        System.err.println(" versions     number of cell versions to verify");
        System.err.println(" batch        batch count for scan, note that result row counts will no longer be actual number of rows when you use this option");
        System.err.println(" raw          includes raw scan if given in options");
        System.err.println(" families     comma-separated list of families to copy");
        System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
        System.err.println(" delimiter    the delimiter used in display around rowkey");
        System.err.println(" recompareSleep   milliseconds to sleep before recompare row, default value is 0 which disables the recompare.");
        System.err.println(" recompareThreads number of threads to run recompares in");
        System.err.println(" recompareTries   number of recompare attempts before incrementing the BADROWS counter. Defaults to 1 recompare");
        System.out.println(" recompareBackoffExponent exponential multiplier to increase recompareSleep after each recompare attempt, default value is 0 which results in a constant sleep time");
        System.err.println(" verbose      logs row keys of good rows");
        System.err.println(" peerTableName  Peer Table Name");
        System.err.println(" sourceSnapshotName  Source Snapshot Name");
        System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
        System.err.println(" peerSnapshotName  Peer Snapshot Name");
        System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
        System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
        System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
        System.err.println();
        System.err.println("Args:");
        System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
        System.err.println(" peerQuorumAddress   quorumAdress of the peer used for verification. The format is zk_quorum:zk_port:zk_hbase_path");
        System.err.println(" tablename    Name of the table to verify");
        System.err.println();
        System.err.println("Examples:");
        System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
        System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
        System.err.println();
        System.err.println(" To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b");
        System.err.println(" Assume quorum address for cluster-b is cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b");
        System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b \\\n     TestTable");
        System.err.println();
        System.err.println(" To verify the data in TestTable between the secured cluster runs VerifyReplication and insecure cluster-b");
        System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n     -D verifyrep.peer.hbase.security.authentication=simple \\\n     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b \\\n     TestTable");
        System.err.println();
        System.err.println(" To verify the data in TestTable between the secured cluster runs VerifyReplication and secured cluster-b");
        System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E, for master and regionserver kerberos principal from another cluster");
        System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n     -D verifyrep.peer.hbase.regionserver.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n     -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b \\\n     TestTable");
        System.err.println();
        System.err.println(" To verify the data in TestTable between the insecure cluster runs VerifyReplication and secured cluster-b");
        System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n     -D verifyrep.peer.hbase.security.authentication=kerberos \\\n     -D verifyrep.peer.hbase.regionserver.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n     -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b \\\n     TestTable");
    }

    private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) {
        if (maxThreads == 0) {
            return null;
        }
        return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), VerifyReplication.buildRejectedReComparePolicy(context));
    }

    private static ThreadPoolExecutor.CallerRunsPolicy buildRejectedReComparePolicy(final Mapper.Context context) {
        return new ThreadPoolExecutor.CallerRunsPolicy(){

            @Override
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
                LOG.debug("Re-comparison execution rejected. Running in main thread.");
                context.getCounter((Enum)Verifier.Counters.MAIN_THREAD_RECOMPARES).increment(1L);
                super.rejectedExecution(runnable, e);
            }
        };
    }

    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        Job job = this.createSubmittableJob(conf, args);
        if (job != null) {
            return job.waitForCompletion(true) ? 0 : 1;
        }
        return 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)HBaseConfiguration.create(), (Tool)new VerifyReplication(), (String[])args);
        System.exit(res);
    }

    public static class Verifier
    extends TableMapper<ImmutableBytesWritable, Put> {
        private Connection sourceConnection;
        private Table sourceTable;
        private Connection replicatedConnection;
        private Table replicatedTable;
        private ResultScanner replicatedScanner;
        private Result currentCompareRowInPeerTable;
        private Scan tableScan;
        private int reCompareTries;
        private int reCompareBackoffExponent;
        private int sleepMsBeforeReCompare;
        private String delimiter = "";
        private boolean verbose = false;
        private int batch = -1;

        public void map(ImmutableBytesWritable row, Result value, Mapper.Context context) throws IOException {
            if (this.replicatedScanner == null) {
                Configuration conf = context.getConfiguration();
                this.reCompareTries = conf.getInt("verifyrep.recompareTries", 0);
                this.reCompareBackoffExponent = conf.getInt("verifyrep.recompareBackoffExponent", 1);
                this.sleepMsBeforeReCompare = conf.getInt("verifyrep.sleepMsBeforeReCompare", 0);
                if (this.sleepMsBeforeReCompare > 0) {
                    this.reCompareTries = Math.max(this.reCompareTries, 1);
                }
                this.delimiter = conf.get("verifyrep.delimiter", "");
                this.verbose = conf.getBoolean("verifyrep.verbose", false);
                this.batch = conf.getInt("verifyrep.batch", -1);
                Scan scan = new Scan();
                if (this.batch > 0) {
                    scan.setBatch(this.batch);
                }
                scan.setCacheBlocks(false);
                scan.setCaching(conf.getInt("hbase.mapreduce.scan.cachedrows", 1));
                long startTime = conf.getLong("verifyrep.startTime", 0L);
                long endTime = conf.getLong("verifyrep.endTime", Long.MAX_VALUE);
                String families = conf.get("verifyrep.families", null);
                if (families != null) {
                    String[] fams;
                    for (String fam : fams = families.split(",")) {
                        scan.addFamily(Bytes.toBytes(fam));
                    }
                }
                boolean includeDeletedCells = conf.getBoolean("verifyrep.includeDeletedCells", false);
                scan.setRaw(includeDeletedCells);
                String rowPrefixes = conf.get("verifyrep.rowPrefixes", null);
                VerifyReplication.setRowPrefixFilter(scan, rowPrefixes);
                scan.setTimeRange(startTime, endTime);
                int versions = conf.getInt("verifyrep.versions", -1);
                LOG.info("Setting number of version inside map as: " + versions);
                if (versions >= 0) {
                    scan.readVersions(versions);
                }
                int reCompareThreads = conf.getInt("verifyrep.recompareThreads", 0);
                reCompareExecutor = VerifyReplication.buildReCompareExecutor(reCompareThreads, context);
                TableName tableName = TableName.valueOf(conf.get("verifyrep.tableName"));
                this.sourceConnection = ConnectionFactory.createConnection(conf);
                this.sourceTable = this.sourceConnection.getTable(tableName);
                this.tableScan = scan;
                InputSplit tableSplit = context.getInputSplit();
                String zkClusterKey = conf.get("verifyrep.peerQuorumAddress");
                Configuration peerConf = HBaseConfiguration.createClusterConf(conf, zkClusterKey, VerifyReplication.PEER_CONFIG_PREFIX);
                String peerName = peerConf.get("verifyrep.peerTableName", tableName.getNameAsString());
                TableName peerTableName = TableName.valueOf(peerName);
                this.replicatedConnection = ConnectionFactory.createConnection(peerConf);
                this.replicatedTable = this.replicatedConnection.getTable(peerTableName);
                scan.withStartRow(value.getRow());
                byte[] endRow = null;
                endRow = tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit ? ((TableSnapshotInputFormat.TableSnapshotRegionSplit)tableSplit).getRegion().getEndKey() : ((TableSplit)tableSplit).getEndRow();
                scan.withStopRow(endRow);
                String peerSnapshotName = conf.get("verifyrep.peerSnapshotName", null);
                if (peerSnapshotName != null) {
                    String peerSnapshotTmpDir = conf.get("verifyrep.peerSnapshotTmpDir", null);
                    String peerFSAddress = conf.get("verifyrep.peerFSAddress", null);
                    String peerHBaseRootAddress = conf.get("verifyrep.peerHBaseRootAddress", null);
                    FileSystem.setDefaultUri((Configuration)peerConf, (String)peerFSAddress);
                    CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
                    LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" + peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf) + " peerFSAddress:" + peerFSAddress);
                    this.replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf), new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
                } else {
                    this.replicatedScanner = this.replicatedTable.getScanner(scan);
                }
                this.currentCompareRowInPeerTable = this.replicatedScanner.next();
            }
            while (true) {
                if (this.currentCompareRowInPeerTable == null) {
                    this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
                    break;
                }
                int rowCmpRet = Bytes.compareTo(value.getRow(), this.currentCompareRowInPeerTable.getRow());
                if (rowCmpRet == 0) {
                    try {
                        Result.compareResults(value, this.currentCompareRowInPeerTable, false);
                        context.getCounter((Enum)Counters.GOODROWS).increment(1L);
                        if (this.verbose) {
                            LOG.info("Good row key: " + this.delimiter + Bytes.toStringBinary(value.getRow()) + this.delimiter);
                        }
                    }
                    catch (Exception e) {
                        this.logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value, this.currentCompareRowInPeerTable);
                    }
                    this.currentCompareRowInPeerTable = this.replicatedScanner.next();
                    break;
                }
                if (rowCmpRet < 0) {
                    this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
                    break;
                }
                this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, this.currentCompareRowInPeerTable);
                this.currentCompareRowInPeerTable = this.replicatedScanner.next();
            }
        }

        private void logFailRowAndIncreaseCounter(Mapper.Context context, Counters counter, Result row, Result replicatedRow) {
            byte[] rowKey = VerifyReplication.getRow(row, replicatedRow);
            if (this.reCompareTries == 0) {
                context.getCounter((Enum)counter).increment(1L);
                context.getCounter((Enum)Counters.BADROWS).increment(1L);
                LOG.error("{}, rowkey={}{}{}", new Object[]{counter, this.delimiter, Bytes.toStringBinary(rowKey), this.delimiter});
                return;
            }
            VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, row, replicatedRow, counter, this.delimiter, this.tableScan, this.sourceTable, this.replicatedTable, this.reCompareTries, this.sleepMsBeforeReCompare, this.reCompareBackoffExponent, this.verbose);
            if (reCompareExecutor == null) {
                runnable.run();
                return;
            }
            reCompareExecutor.submit(runnable);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void cleanup(Mapper.Context context) {
            if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) {
                reCompareExecutor.shutdown();
                try {
                    boolean terminated = reCompareExecutor.awaitTermination(1L, TimeUnit.MINUTES);
                    if (!terminated) {
                        List<Runnable> queue = reCompareExecutor.shutdownNow();
                        for (Runnable runnable : queue) {
                            ((VerifyReplicationRecompareRunnable)runnable).fail();
                        }
                        terminated = reCompareExecutor.awaitTermination(1L, TimeUnit.MINUTES);
                        if (!terminated) {
                            int activeCount = Math.max(1, reCompareExecutor.getActiveCount());
                            LOG.warn("Found {} possible recompares still running in the executable incrementing BADROWS and FAILED_RECOMPARE", (Object)activeCount);
                            context.getCounter((Enum)Counters.BADROWS).increment((long)activeCount);
                            context.getCounter((Enum)Counters.FAILED_RECOMPARE).increment((long)activeCount);
                        }
                    }
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("Failed to await executor termination in cleanup", e);
                }
            }
            if (this.replicatedScanner != null) {
                try {
                    while (this.currentCompareRowInPeerTable != null) {
                        this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, this.currentCompareRowInPeerTable);
                        this.currentCompareRowInPeerTable = this.replicatedScanner.next();
                    }
                }
                catch (Exception e) {
                    LOG.error("fail to scan peer table in cleanup", (Throwable)e);
                }
                finally {
                    this.replicatedScanner.close();
                    this.replicatedScanner = null;
                }
            }
            if (this.sourceTable != null) {
                try {
                    this.sourceTable.close();
                }
                catch (IOException e) {
                    LOG.error("fail to close source table in cleanup", (Throwable)e);
                }
            }
            if (this.sourceConnection != null) {
                try {
                    this.sourceConnection.close();
                }
                catch (Exception e) {
                    LOG.error("fail to close source connection in cleanup", (Throwable)e);
                }
            }
            if (this.replicatedTable != null) {
                try {
                    this.replicatedTable.close();
                }
                catch (Exception e) {
                    LOG.error("fail to close replicated table in cleanup", (Throwable)e);
                }
            }
            if (this.replicatedConnection != null) {
                try {
                    this.replicatedConnection.close();
                }
                catch (Exception e) {
                    LOG.error("fail to close replicated connection in cleanup", (Throwable)e);
                }
            }
        }

        public static enum Counters {
            GOODROWS,
            BADROWS,
            ONLY_IN_SOURCE_TABLE_ROWS,
            ONLY_IN_PEER_TABLE_ROWS,
            CONTENT_DIFFERENT_ROWS,
            RECOMPARES,
            MAIN_THREAD_RECOMPARES,
            SOURCE_ROW_CHANGED,
            PEER_ROW_CHANGED,
            FAILED_RECOMPARE;

        }
    }
}

