/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.mapreduce.replication;

import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class VerifyReplication
extends Configured
implements Tool {
    private static final Log LOG = LogFactory.getLog(VerifyReplication.class);
    public static final String NAME = "verifyrep";
    private static final String PEER_CONFIG_PREFIX = "verifyrep.peer.";
    long startTime = 0L;
    long endTime = Long.MAX_VALUE;
    int batch = -1;
    int versions = -1;
    String tableName = null;
    String families = null;
    String delimiter = "";
    String peerId = null;
    String rowPrefixes = null;
    int sleepMsBeforeReCompare = 0;
    boolean verbose = false;
    boolean includeDeletedCells = false;
    String sourceSnapshotName = null;
    String sourceSnapshotTmpDir = null;
    String peerSnapshotName = null;
    String peerSnapshotTmpDir = null;
    String peerFSAddress = null;
    String peerHBaseRootAddress = null;
    private static final String JOB_NAME_CONF_KEY = "mapreduce.job.name";

    private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(Configuration conf, String peerId) throws IOException {
        ZooKeeperWatcher localZKW = null;
        ReplicationPeerZKImpl peer = null;
        try {
            localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", new Abortable(){

                @Override
                public void abort(String why, Throwable e) {
                }

                @Override
                public boolean isAborted() {
                    return false;
                }
            });
            ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
            rp.init();
            Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
            if (pair == null) {
                throw new IOException("Couldn't get peer conf!");
            }
            Pair<ReplicationPeerConfig, Configuration> pair2 = pair;
            return pair2;
        }
        catch (ReplicationException e) {
            throw new IOException("An error occurred while trying to connect to the remove peer cluster", e);
        }
        finally {
            if (peer != null) {
                peer.close();
            }
            if (localZKW != null) {
                localZKW.close();
            }
        }
    }

    public Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
        if (!this.doCommandLine(args)) {
            return null;
        }
        conf.set("verifyrep.peerId", this.peerId);
        conf.set("verifyrep.tableName", this.tableName);
        conf.setLong("verifyrep.startTime", this.startTime);
        conf.setLong("verifyrep.endTime", this.endTime);
        conf.setInt("verifyrep.sleepMsBeforeReCompare", this.sleepMsBeforeReCompare);
        conf.set("verifyrep.delimiter", this.delimiter);
        conf.setInt("verifyrep.batch", this.batch);
        conf.setBoolean("verifyrep.verbose", this.verbose);
        conf.setBoolean("verifyrep.includeDeletedCells", this.includeDeletedCells);
        if (this.families != null) {
            conf.set("verifyrep.families", this.families);
        }
        if (this.rowPrefixes != null) {
            conf.set("verifyrep.rowPrefixes", this.rowPrefixes);
        }
        Pair<ReplicationPeerConfig, Configuration> peerConfigPair = VerifyReplication.getPeerQuorumConfig(conf, this.peerId);
        ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
        String peerQuorumAddress = peerConfig.getClusterKey();
        LOG.info((Object)("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + peerConfig.getConfiguration()));
        conf.set("verifyrep.peerQuorumAddress", peerQuorumAddress);
        HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, peerConfig.getConfiguration().entrySet());
        conf.setInt("verifyrep.versions", this.versions);
        LOG.info((Object)("Number of version: " + this.versions));
        if (this.peerSnapshotName != null) {
            conf.set("verifyrep.peerSnapshotName", this.peerSnapshotName);
            conf.set("verifyrep.peerSnapshotTmpDir", this.peerSnapshotTmpDir);
            conf.set("verifyrep.peerFSAddress", this.peerFSAddress);
            conf.set("verifyrep.peerHBaseRootAddress", this.peerHBaseRootAddress);
            conf.setStrings("mapreduce.job.hdfs-servers", this.peerFSAddress);
        }
        Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, "verifyrep_" + this.tableName));
        job.setJarByClass(VerifyReplication.class);
        Scan scan = new Scan();
        scan.setTimeRange(this.startTime, this.endTime);
        scan.setRaw(this.includeDeletedCells);
        scan.setCacheBlocks(false);
        if (this.batch > 0) {
            scan.setBatch(this.batch);
        }
        if (this.versions >= 0) {
            scan.setMaxVersions(this.versions);
            LOG.info((Object)("Number of versions set to " + this.versions));
        }
        if (this.families != null) {
            String[] fams;
            for (String fam : fams = this.families.split(",")) {
                scan.addFamily(Bytes.toBytes(fam));
            }
        }
        VerifyReplication.setRowPrefixFilter(scan, this.rowPrefixes);
        if (this.sourceSnapshotName != null) {
            Path snapshotTempPath = new Path(this.sourceSnapshotTmpDir);
            LOG.info((Object)("Using source snapshot-" + this.sourceSnapshotName + " with temp dir:" + this.sourceSnapshotTmpDir));
            TableMapReduceUtil.initTableSnapshotMapperJob(this.sourceSnapshotName, scan, Verifier.class, null, null, job, true, snapshotTempPath);
        } else {
            TableMapReduceUtil.initTableMapperJob(this.tableName, scan, Verifier.class, null, null, job);
        }
        Configuration peerClusterConf = peerConfigPair.getSecond();
        TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setNumReduceTasks(0);
        return job;
    }

    private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
        if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
            Object[] rowPrefixArray = rowPrefixes.split(",");
            Arrays.sort(rowPrefixArray);
            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
            for (Object prefix : rowPrefixArray) {
                PrefixFilter filter = new PrefixFilter(Bytes.toBytes((String)prefix));
                filterList.addFilter(filter);
            }
            scan.setFilter(filterList);
            byte[] startPrefixRow = Bytes.toBytes((String)rowPrefixArray[0]);
            byte[] lastPrefixRow = Bytes.toBytes((String)rowPrefixArray[rowPrefixArray.length - 1]);
            VerifyReplication.setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
        }
    }

    private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
        scan.setStartRow(startPrefixRow);
        byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), new byte[]{(byte)(lastPrefixRow[lastPrefixRow.length - 1] + 1)});
        scan.setStopRow(stopRow);
    }

    @VisibleForTesting
    public boolean doCommandLine(String[] args) {
        if (args.length < 2) {
            VerifyReplication.printUsage(null);
            return false;
        }
        try {
            for (int i = 0; i < args.length; ++i) {
                String cmd = args[i];
                if (cmd.equals("-h") || cmd.startsWith("--h")) {
                    VerifyReplication.printUsage(null);
                    return false;
                }
                String startTimeArgKey = "--starttime=";
                if (cmd.startsWith("--starttime=")) {
                    this.startTime = Long.parseLong(cmd.substring("--starttime=".length()));
                    continue;
                }
                String endTimeArgKey = "--endtime=";
                if (cmd.startsWith("--endtime=")) {
                    this.endTime = Long.parseLong(cmd.substring("--endtime=".length()));
                    continue;
                }
                String includeDeletedCellsArgKey = "--raw";
                if (cmd.equals("--raw")) {
                    this.includeDeletedCells = true;
                    continue;
                }
                String versionsArgKey = "--versions=";
                if (cmd.startsWith("--versions=")) {
                    this.versions = Integer.parseInt(cmd.substring("--versions=".length()));
                    continue;
                }
                String batchArgKey = "--batch=";
                if (cmd.startsWith("--batch=")) {
                    this.batch = Integer.parseInt(cmd.substring("--batch=".length()));
                    continue;
                }
                String familiesArgKey = "--families=";
                if (cmd.startsWith("--families=")) {
                    this.families = cmd.substring("--families=".length());
                    continue;
                }
                String rowPrefixesKey = "--row-prefixes=";
                if (cmd.startsWith("--row-prefixes=")) {
                    this.rowPrefixes = cmd.substring("--row-prefixes=".length());
                    continue;
                }
                String delimiterArgKey = "--delimiter=";
                if (cmd.startsWith("--delimiter=")) {
                    this.delimiter = cmd.substring("--delimiter=".length());
                    continue;
                }
                String sleepToReCompareKey = "--recomparesleep=";
                if (cmd.startsWith("--recomparesleep=")) {
                    this.sleepMsBeforeReCompare = Integer.parseInt(cmd.substring("--recomparesleep=".length()));
                    continue;
                }
                String verboseKey = "--verbose";
                if (cmd.startsWith("--verbose")) {
                    this.verbose = true;
                    continue;
                }
                String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
                if (cmd.startsWith("--sourceSnapshotName=")) {
                    this.sourceSnapshotName = cmd.substring("--sourceSnapshotName=".length());
                    continue;
                }
                String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
                if (cmd.startsWith("--sourceSnapshotTmpDir=")) {
                    this.sourceSnapshotTmpDir = cmd.substring("--sourceSnapshotTmpDir=".length());
                    continue;
                }
                String peerSnapshotNameArgKey = "--peerSnapshotName=";
                if (cmd.startsWith("--peerSnapshotName=")) {
                    this.peerSnapshotName = cmd.substring("--peerSnapshotName=".length());
                    continue;
                }
                String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
                if (cmd.startsWith("--peerSnapshotTmpDir=")) {
                    this.peerSnapshotTmpDir = cmd.substring("--peerSnapshotTmpDir=".length());
                    continue;
                }
                String peerFSAddressArgKey = "--peerFSAddress=";
                if (cmd.startsWith("--peerFSAddress=")) {
                    this.peerFSAddress = cmd.substring("--peerFSAddress=".length());
                    continue;
                }
                String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
                if (cmd.startsWith("--peerHBaseRootAddress=")) {
                    this.peerHBaseRootAddress = cmd.substring("--peerHBaseRootAddress=".length());
                    continue;
                }
                if (cmd.startsWith("--")) {
                    VerifyReplication.printUsage("Invalid argument '" + cmd + "'");
                    return false;
                }
                if (i == args.length - 2) {
                    this.peerId = cmd;
                }
                if (i != args.length - 1) continue;
                this.tableName = cmd;
            }
            if (this.sourceSnapshotName != null && this.sourceSnapshotTmpDir == null || this.sourceSnapshotName == null && this.sourceSnapshotTmpDir != null) {
                VerifyReplication.printUsage("Source snapshot name and snapshot temp location should be provided to use snapshots in source cluster");
                return false;
            }
            if (!(this.peerSnapshotName == null && this.peerSnapshotTmpDir == null && this.peerFSAddress == null && this.peerHBaseRootAddress == null || this.peerSnapshotName != null && this.peerSnapshotTmpDir != null && this.peerFSAddress != null && this.peerHBaseRootAddress != null)) {
                VerifyReplication.printUsage("Peer snapshot name, peer snapshot temp location, Peer HBase root address and  peer FSAddress should be provided to use snapshots in peer cluster");
                return false;
            }
            if ((this.sourceSnapshotName != null || this.peerSnapshotName != null) && this.sleepMsBeforeReCompare > 0) {
                VerifyReplication.printUsage("Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
                return false;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            VerifyReplication.printUsage("Can't start because " + e.getMessage());
            return false;
        }
        return true;
    }

    private static void printUsage(String errorMsg) {
        if (errorMsg != null && errorMsg.length() > 0) {
            System.err.println("ERROR: " + errorMsg);
        }
        System.err.println("Usage: verifyrep [--starttime=X] [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] [--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U]  <peerid> <tablename>");
        System.err.println();
        System.err.println("Options:");
        System.err.println(" starttime    beginning of the time range");
        System.err.println("              without endtime means from starttime to forever");
        System.err.println(" endtime      end of the time range");
        System.err.println(" versions     number of cell versions to verify");
        System.err.println(" batch        batch count for scan, note that result row counts will no longer be actual number of rows when you use this option");
        System.err.println(" raw          includes raw scan if given in options");
        System.err.println(" families     comma-separated list of families to copy");
        System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
        System.err.println(" delimiter    the delimiter used in display around rowkey");
        System.err.println(" recomparesleep   milliseconds to sleep before recompare row, default value is 0 which disables the recompare.");
        System.err.println(" verbose      logs row keys of good rows");
        System.err.println(" sourceSnapshotName  Source Snapshot Name");
        System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
        System.err.println(" peerSnapshotName  Peer Snapshot Name");
        System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
        System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
        System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
        System.err.println();
        System.err.println("Args:");
        System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
        System.err.println(" tablename    Name of the table to verify");
        System.err.println();
        System.err.println("Examples:");
        System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
        System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        Job job = this.createSubmittableJob(conf, args);
        if (job != null) {
            return job.waitForCompletion(true) ? 0 : 1;
        }
        return 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
        System.exit(res);
    }

    public static class Verifier
    extends TableMapper<ImmutableBytesWritable, Put> {
        private Connection sourceConnection;
        private Table sourceTable;
        private Connection replicatedConnection;
        private Table replicatedTable;
        private ResultScanner replicatedScanner;
        private Result currentCompareRowInPeerTable;
        private int sleepMsBeforeReCompare;
        private String delimiter = "";
        private boolean verbose = false;
        private int batch = -1;

        @Override
        public void map(ImmutableBytesWritable row, Result value, Mapper.Context context) throws IOException {
            if (this.replicatedScanner == null) {
                Configuration conf = context.getConfiguration();
                this.sleepMsBeforeReCompare = conf.getInt("verifyrep.sleepMsBeforeReCompare", 0);
                this.delimiter = conf.get("verifyrep.delimiter", "");
                this.verbose = conf.getBoolean("verifyrep.verbose", false);
                this.batch = conf.getInt("verifyrep.batch", -1);
                Scan scan = new Scan();
                if (this.batch > 0) {
                    scan.setBatch(this.batch);
                }
                scan.setCacheBlocks(false);
                scan.setCaching(conf.getInt("hbase.mapreduce.scan.cachedrows", 1));
                long startTime = conf.getLong("verifyrep.startTime", 0L);
                long endTime = conf.getLong("verifyrep.endTime", Long.MAX_VALUE);
                String families = conf.get("verifyrep.families", null);
                if (families != null) {
                    String[] fams;
                    for (String fam : fams = families.split(",")) {
                        scan.addFamily(Bytes.toBytes(fam));
                    }
                }
                boolean includeDeletedCells = conf.getBoolean("verifyrep.includeDeletedCells", false);
                scan.setRaw(includeDeletedCells);
                String rowPrefixes = conf.get("verifyrep.rowPrefixes", null);
                VerifyReplication.setRowPrefixFilter(scan, rowPrefixes);
                scan.setTimeRange(startTime, endTime);
                int versions = conf.getInt("verifyrep.versions", -1);
                LOG.info((Object)("Setting number of version inside map as: " + versions));
                if (versions >= 0) {
                    scan.setMaxVersions(versions);
                }
                TableName tableName = TableName.valueOf(conf.get("verifyrep.tableName"));
                this.sourceConnection = ConnectionFactory.createConnection(conf);
                this.sourceTable = this.sourceConnection.getTable(tableName);
                InputSplit tableSplit = context.getInputSplit();
                String zkClusterKey = conf.get("verifyrep.peerQuorumAddress");
                Configuration peerConf = HBaseConfiguration.createClusterConf(conf, zkClusterKey, VerifyReplication.PEER_CONFIG_PREFIX);
                this.replicatedConnection = ConnectionFactory.createConnection(peerConf);
                this.replicatedTable = this.replicatedConnection.getTable(tableName);
                scan.setStartRow(value.getRow());
                byte[] endRow = null;
                endRow = tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit ? ((TableSnapshotInputFormat.TableSnapshotRegionSplit)tableSplit).getRegionInfo().getEndKey() : ((TableSplit)tableSplit).getEndRow();
                scan.setStopRow(endRow);
                String peerSnapshotName = conf.get("verifyrep.peerSnapshotName", null);
                if (peerSnapshotName != null) {
                    String peerSnapshotTmpDir = conf.get("verifyrep.peerSnapshotTmpDir", null);
                    String peerFSAddress = conf.get("verifyrep.peerFSAddress", null);
                    String peerHBaseRootAddress = conf.get("verifyrep.peerHBaseRootAddress", null);
                    FileSystem.setDefaultUri(peerConf, peerFSAddress);
                    FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
                    LOG.info((Object)("Using peer snapshot:" + peerSnapshotName + " with temp dir:" + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf) + " peerFSAddress:" + peerFSAddress));
                    this.replicatedScanner = new TableSnapshotScanner(peerConf, new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan);
                } else {
                    this.replicatedScanner = this.replicatedTable.getScanner(scan);
                }
                this.currentCompareRowInPeerTable = this.replicatedScanner.next();
            }
            while (true) {
                if (this.currentCompareRowInPeerTable == null) {
                    this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
                    break;
                }
                int rowCmpRet = Bytes.compareTo(value.getRow(), this.currentCompareRowInPeerTable.getRow());
                if (rowCmpRet == 0) {
                    try {
                        Result.compareResults(value, this.currentCompareRowInPeerTable);
                        context.getCounter(Counters.GOODROWS).increment(1L);
                        if (this.verbose) {
                            LOG.info((Object)("Good row key: " + this.delimiter + Bytes.toStringBinary(value.getRow()) + this.delimiter));
                        }
                    }
                    catch (Exception e) {
                        this.logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
                    }
                    this.currentCompareRowInPeerTable = this.replicatedScanner.next();
                    break;
                }
                if (rowCmpRet < 0) {
                    this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
                    break;
                }
                this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, this.currentCompareRowInPeerTable);
                this.currentCompareRowInPeerTable = this.replicatedScanner.next();
            }
        }

        private void logFailRowAndIncreaseCounter(Mapper.Context context, Counters counter, Result row) {
            if (this.sleepMsBeforeReCompare > 0) {
                Threads.sleep(this.sleepMsBeforeReCompare);
                try {
                    Result sourceResult = this.sourceTable.get(new Get(row.getRow()));
                    Result replicatedResult = this.replicatedTable.get(new Get(row.getRow()));
                    Result.compareResults(sourceResult, replicatedResult);
                    if (!sourceResult.isEmpty()) {
                        context.getCounter(Counters.GOODROWS).increment(1L);
                        if (this.verbose) {
                            LOG.info((Object)("Good row key (with recompare): " + this.delimiter + Bytes.toStringBinary(row.getRow()) + this.delimiter));
                        }
                    }
                    return;
                }
                catch (Exception e) {
                    LOG.error((Object)("recompare fail after sleep, rowkey=" + this.delimiter + Bytes.toStringBinary(row.getRow()) + this.delimiter));
                }
            }
            context.getCounter(counter).increment(1L);
            context.getCounter(Counters.BADROWS).increment(1L);
            LOG.error((Object)(counter.toString() + ", rowkey=" + this.delimiter + Bytes.toStringBinary(row.getRow()) + this.delimiter));
        }

        @Override
        protected void cleanup(Mapper.Context context) {
            if (this.replicatedScanner != null) {
                try {
                    while (this.currentCompareRowInPeerTable != null) {
                        this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, this.currentCompareRowInPeerTable);
                        this.currentCompareRowInPeerTable = this.replicatedScanner.next();
                    }
                }
                catch (Exception e) {
                    LOG.error((Object)"fail to scan peer table in cleanup", (Throwable)e);
                }
                finally {
                    this.replicatedScanner.close();
                    this.replicatedScanner = null;
                }
            }
            if (this.sourceTable != null) {
                try {
                    this.sourceTable.close();
                }
                catch (IOException e) {
                    LOG.error((Object)"fail to close source table in cleanup", (Throwable)e);
                }
            }
            if (this.sourceConnection != null) {
                try {
                    this.sourceConnection.close();
                }
                catch (Exception e) {
                    LOG.error((Object)"fail to close source connection in cleanup", (Throwable)e);
                }
            }
            if (this.replicatedTable != null) {
                try {
                    this.replicatedTable.close();
                }
                catch (Exception e) {
                    LOG.error((Object)"fail to close replicated table in cleanup", (Throwable)e);
                }
            }
            if (this.replicatedConnection != null) {
                try {
                    this.replicatedConnection.close();
                }
                catch (Exception e) {
                    LOG.error((Object)"fail to close replicated connection in cleanup", (Throwable)e);
                }
            }
        }

        public static enum Counters {
            GOODROWS,
            BADROWS,
            ONLY_IN_SOURCE_TABLE_ROWS,
            ONLY_IN_PEER_TABLE_ROWS,
            CONTENT_DIFFERENT_ROWS;

        }
    }
}

