/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ReplicationTests.class, LargeTests.class})
public class TestGlobalReplicationThrottler {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestGlobalReplicationThrottler.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestGlobalReplicationThrottler.class);
    private static final int REPLICATION_SOURCE_QUOTA = 200;
    private static int numOfPeer = 0;
    private static Configuration conf1;
    private static Configuration conf2;
    private static HBaseTestingUtility utility1;
    private static HBaseTestingUtility utility2;
    private static final byte[] famName;
    private static final byte[] VALUE;
    private static final byte[] ROW;
    private static final byte[][] ROWS;
    @Rule
    public TestName name = new TestName();
    private volatile boolean testQuotaPass = false;
    private volatile boolean testQuotaNonZero = false;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf1 = HBaseConfiguration.create();
        conf1.set("zookeeper.znode.parent", "/1");
        conf1.setLong("replication.source.sleepforretries", 100L);
        conf1.setInt("replication.total.buffer.quota", 200);
        conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
        utility1 = new HBaseTestingUtility(conf1);
        utility1.startMiniZKCluster();
        MiniZooKeeperCluster miniZK = utility1.getZkCluster();
        new ZKWatcher(conf1, "cluster1", null, true);
        conf2 = new Configuration(conf1);
        conf2.set("zookeeper.znode.parent", "/2");
        utility2 = new HBaseTestingUtility(conf2);
        utility2.setZkCluster(miniZK);
        new ZKWatcher(conf2, "cluster2", null, true);
        ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
        ReplicationPeerConfig rpc = new ReplicationPeerConfig();
        rpc.setClusterKey(utility2.getClusterKey());
        utility1.startMiniCluster();
        utility2.startMiniCluster();
        admin1.addPeer("peer1", rpc, null);
        admin1.addPeer("peer2", rpc, null);
        admin1.addPeer("peer3", rpc, null);
        numOfPeer = admin1.getPeersCount();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        utility2.shutdownMiniCluster();
        utility1.shutdownMiniCluster();
    }

    @Test
    public void testQuota() throws IOException {
        TableName tableName = TableName.valueOf((String)this.name.getMethodName());
        HTableDescriptor table = new HTableDescriptor(tableName);
        HColumnDescriptor fam = new HColumnDescriptor(famName);
        fam.setScope(1);
        table.addFamily(fam);
        utility1.getAdmin().createTable((TableDescriptor)table);
        utility2.getAdmin().createTable((TableDescriptor)table);
        Thread watcher = new Thread(() -> {
            Replication replication = (Replication)utility1.getMiniHBaseCluster().getRegionServer(0).getReplicationSourceService();
            AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed();
            this.testQuotaPass = true;
            while (!Thread.interrupted()) {
                long size = bufferUsed.get();
                if (size > 0L) {
                    this.testQuotaNonZero = true;
                }
                if (size > (long)(200 * (numOfPeer + 1))) {
                    this.testQuotaPass = false;
                }
                Threads.sleep((long)50L);
            }
        });
        watcher.start();
        try (Table t1 = utility1.getConnection().getTable(tableName);
             Table t2 = utility2.getConnection().getTable(tableName);){
            for (int i = 0; i < 50; ++i) {
                Put put = new Put(ROWS[i]);
                put.addColumn(famName, VALUE, VALUE);
                t1.put(put);
            }
            long start = EnvironmentEdgeManager.currentTime();
            while (EnvironmentEdgeManager.currentTime() - start < 180000L) {
                Scan scan = new Scan();
                scan.setCaching(50);
                int count = 0;
                try (ResultScanner results = t2.getScanner(scan);){
                    for (Result result : results) {
                        ++count;
                    }
                }
                if (count < 50) {
                    LOG.info("Waiting all logs pushed to slave. Expected 50 , actual " + count);
                    Threads.sleep((long)200L);
                    continue;
                }
                break;
            }
        }
        watcher.interrupt();
        Assert.assertTrue((boolean)this.testQuotaPass);
        Assert.assertTrue((boolean)this.testQuotaNonZero);
    }

    static {
        famName = Bytes.toBytes((String)"f");
        VALUE = Bytes.toBytes((String)"v");
        ROW = Bytes.toBytes((String)"r");
        ROWS = HTestConst.makeNAscii(ROW, 100);
    }
}

