/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ReplicationTests.class, MediumTests.class})
public class TestReplicationMarker {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationMarker.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class);
    private static Configuration conf1;
    private static Configuration conf2;
    private static HBaseTestingUtil utility1;
    private static HBaseTestingUtil utility2;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf1 = HBaseConfiguration.create();
        conf1.set("zookeeper.znode.parent", "/1");
        conf2 = new Configuration(conf1);
        conf1.setBoolean("hbase.regionserver.replication.marker.enabled", true);
        conf1.setLong("hbase.regionserver.replication.marker.chore.duration", 1000L);
        utility1 = new HBaseTestingUtil(conf1);
        conf2.set("zookeeper.znode.parent", "/2");
        conf2.setBoolean("hbase.regionserver.replication.sink.tracker.enabled", true);
        utility2 = new HBaseTestingUtil(conf2);
        utility2.startMiniCluster(1);
        TestReplicationMarker.waitForReplicationTrackerTableCreation();
        utility1.startMiniCluster(1);
        Admin admin1 = utility1.getAdmin();
        ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder();
        rpcBuilder.setClusterKey(utility2.getClusterKey());
        admin1.addReplicationPeer("1", rpcBuilder.build());
        ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0).getReplicationSourceService().getReplicationManager();
        Waiter.waitFor((Configuration)conf1, (long)10000L, () -> manager.getSources().size() == 1);
    }

    private static void waitForReplicationTrackerTableCreation() {
        Waiter.waitFor((Configuration)conf2, (long)10000L, () -> utility2.getAdmin().tableExists(ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME));
    }

    @AfterClass
    public static void tearDown() throws Exception {
        utility1.shutdownMiniCluster();
        utility2.shutdownMiniCluster();
    }

    @Test
    public void testReplicationMarkerRow() throws Exception {
        Thread.sleep(5000L);
        WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null);
        String walName1ForCluster1 = ((AbstractFSWAL)wal1).getCurrentFileName().getName();
        String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname();
        Assert.assertTrue((this.getReplicatedEntries() >= 5L ? 1 : 0) != 0);
        wal1.rollWriter(true);
        String walName2ForCluster1 = ((AbstractFSWAL)wal1).getCurrentFileName().getName();
        Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection();
        Thread.sleep(5000L);
        utility2.waitFor(5000L, () -> this.getTableCount(connection2) >= 8);
        List<ReplicationSinkTrackerRow> list = this.getRows(connection2);
        for (ReplicationSinkTrackerRow desc : list) {
            Assert.assertEquals((Object)rs1Name, (Object)desc.getRegionServerName());
            Assert.assertTrue((walName1ForCluster1.equals(desc.getWalName()) || walName2ForCluster1.equals(desc.getWalName()) ? 1 : 0) != 0);
        }
        Assert.assertFalse((boolean)utility1.getAdmin().tableExists(ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME));
        Assert.assertTrue((boolean)utility2.getAdmin().tableExists(ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME));
    }

    private List<ReplicationSinkTrackerRow> getRows(Connection connection) throws IOException {
        Result r;
        ArrayList<ReplicationSinkTrackerRow> list = new ArrayList<ReplicationSinkTrackerRow>();
        Scan scan = new Scan();
        Table table = connection.getTable(ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME);
        ResultScanner scanner = table.getScanner(scan);
        while ((r = scanner.next()) != null) {
            List cells = r.listCells();
            list.add(this.getPayload(cells));
        }
        return list;
    }

    private ReplicationSinkTrackerRow getPayload(List<Cell> cells) {
        String rsName = null;
        String walName = null;
        Long offset = null;
        long timestamp = 0L;
        for (Cell cell : cells) {
            byte[] qualifier = CellUtil.cloneQualifier((Cell)cell);
            byte[] value = CellUtil.cloneValue((Cell)cell);
            if (Bytes.equals((byte[])ReplicationSinkTrackerTableCreator.RS_COLUMN, (byte[])qualifier)) {
                rsName = Bytes.toString((byte[])value);
                continue;
            }
            if (Bytes.equals((byte[])ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN, (byte[])qualifier)) {
                walName = Bytes.toString((byte[])value);
                continue;
            }
            if (Bytes.equals((byte[])ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN, (byte[])qualifier)) {
                timestamp = Bytes.toLong((byte[])value);
                continue;
            }
            if (!Bytes.equals((byte[])ReplicationSinkTrackerTableCreator.OFFSET_COLUMN, (byte[])qualifier)) continue;
            offset = Bytes.toLong((byte[])value);
        }
        ReplicationSinkTrackerRow row = new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset);
        return row;
    }

    private int getTableCount(Connection connection) throws Exception {
        Table table = connection.getTable(ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME);
        ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
        int count = 0;
        while (resultScanner.next() != null) {
            ++count;
        }
        LOG.info("Table count: " + count);
        return count;
    }

    private long getReplicatedEntries() {
        ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0).getReplicationSourceService().getReplicationManager();
        List sources = manager.getSources();
        Assert.assertEquals((long)1L, (long)sources.size());
        ReplicationSource source = (ReplicationSource)sources.get(0);
        return source.getTotalReplicatedEdits();
    }

    static class ReplicationSinkTrackerRow {
        private String region_server_name;
        private String wal_name;
        private long timestamp;
        private long offset;

        public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp, long offset) {
            this.region_server_name = region_server_name;
            this.wal_name = wal_name;
            this.timestamp = timestamp;
            this.offset = offset;
        }

        public String getRegionServerName() {
            return this.region_server_name;
        }

        public String getWalName() {
            return this.wal_name;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public long getOffset() {
            return this.offset;
        }

        public String toString() {
            return "ReplicationSinkTrackerRow{region_server_name='" + this.region_server_name + '\'' + ", wal_name='" + this.wal_name + '\'' + ", timestamp=" + this.timestamp + ", offset=" + this.offset + '}';
        }
    }
}

