/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestReplicationBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
    private static Connection connection1;
    private static Connection connection2;
    protected static Configuration CONF_WITH_LOCALFS;
    protected static ReplicationAdmin admin;
    protected static Admin hbaseAdmin;
    protected static Table htable1;
    protected static Table htable2;
    protected static final HBaseTestingUtility UTIL1;
    protected static final HBaseTestingUtility UTIL2;
    protected static Configuration CONF1;
    protected static Configuration CONF2;
    protected static int NUM_SLAVES1;
    protected static final int NUM_SLAVES2 = 1;
    protected static final int NB_ROWS_IN_BATCH = 100;
    protected static final int NB_ROWS_IN_BIG_BATCH = 1000;
    protected static final long SLEEP_TIME = 500L;
    protected static final int NB_RETRIES = 50;
    protected static AtomicInteger replicateCount;
    protected static volatile List<WAL.Entry> replicatedEntries;
    protected static final TableName tableName;
    protected static final byte[] famName;
    protected static final byte[] row;
    protected static final byte[] noRepfamName;
    protected static final String PEER_ID2 = "2";

    protected boolean isSerialPeer() {
        return false;
    }

    protected final void cleanUp() throws IOException, InterruptedException {
        for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
            UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
        }
        int rowCount = UTIL1.countRows(tableName);
        UTIL1.deleteTableData(tableName);
        Scan scan = new Scan();
        int lastCount = 0;
        for (int i = 0; i < 50; ++i) {
            if (i == 49) {
                Assert.fail((String)"Waited too much time for truncate");
            }
            ResultScanner scanner = htable2.getScanner(scan);
            Result[] res = scanner.next(rowCount);
            scanner.close();
            if (res.length == 0) break;
            if (res.length < lastCount) {
                --i;
            }
            lastCount = res.length;
            LOG.info("Still got " + res.length + " rows");
            Thread.sleep(500L);
        }
    }

    protected static void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException {
        TestReplicationBase.waitForReplication(htable2, expectedRows, retries);
    }

    protected static void waitForReplication(Table htable2, int expectedRows, int retries) throws IOException, InterruptedException {
        for (int i = 0; i < retries; ++i) {
            Scan scan = new Scan();
            if (i == retries - 1) {
                Assert.fail((String)"Waited too much time for normal batch replication");
            }
            ResultScanner scanner = htable2.getScanner(scan);
            Result[] res = scanner.next(expectedRows);
            scanner.close();
            if (res.length == expectedRows) break;
            LOG.info("Only got " + res.length + " rows");
            Thread.sleep(500L);
        }
    }

    protected static void loadData(String prefix, byte[] row) throws IOException {
        TestReplicationBase.loadData(prefix, row, famName);
    }

    protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
        ArrayList<Put> puts = new ArrayList<Put>(100);
        for (int i = 0; i < 100; ++i) {
            Put put = new Put(Bytes.toBytes((String)(prefix + Integer.toString(i))));
            put.addColumn(familyName, row, row);
            puts.add(put);
        }
        htable1.put(puts);
    }

    protected static void setupConfig(HBaseTestingUtility util, String znodeParent) {
        Configuration conf = util.getConfiguration();
        conf.set("zookeeper.znode.parent", znodeParent);
        conf.setInt("replication.source.size.capacity", 102400);
        conf.setLong("replication.source.sleepforretries", 100L);
        conf.setInt("hbase.regionserver.maxlogs", 10);
        conf.setLong("hbase.master.logcleaner.ttl", 10L);
        conf.setInt("zookeeper.recovery.retry", 1);
        conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
        conf.setLong("hbase.server.thread.wakefrequency", 100L);
        conf.setInt("replication.stats.thread.period.seconds", 5);
        conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        conf.setLong("replication.sleep.before.failover", 2000L);
        conf.setInt("replication.source.maxretriesmultiplier", 10);
        conf.setFloat("replication.source.ratio", 1.0f);
        conf.setBoolean("replication.source.eof.autorecovery", true);
        conf.setLong("hbase.serial.replication.waiting.ms", 100L);
    }

    static void configureClusters(HBaseTestingUtility util1, HBaseTestingUtility util2) {
        TestReplicationBase.setupConfig(util1, "/1");
        TestReplicationBase.setupConfig(util2, "/2");
        Configuration conf2 = util2.getConfiguration();
        conf2.set("zookeeper.znode.parent", "/2");
        conf2.setInt("hbase.client.retries.number", 6);
        conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
    }

    static void restartSourceCluster(int numSlaves) throws Exception {
        IOUtils.closeQuietly((Closeable[])new Closeable[]{hbaseAdmin, htable1});
        UTIL1.shutdownMiniHBaseCluster();
        UTIL1.restartHBaseCluster(numSlaves);
        CONF1 = UTIL1.getConfiguration();
        hbaseAdmin = UTIL1.getAdmin();
        Connection connection1 = UTIL1.getConnection();
        htable1 = connection1.getTable(tableName);
    }

    static void restartTargetHBaseCluster(int numSlaves) throws Exception {
        IOUtils.closeQuietly((Closeable)htable2);
        UTIL2.restartHBaseCluster(numSlaves);
        CONF2 = UTIL2.getConfiguration();
        htable2 = UTIL2.getConnection().getTable(tableName);
    }

    private static void startClusters() throws Exception {
        UTIL1.startMiniZKCluster();
        MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
        LOG.info("Setup first Zk");
        UTIL2.setZkCluster(miniZK);
        LOG.info("Setup second Zk");
        CONF_WITH_LOCALFS = HBaseConfiguration.create((Configuration)CONF1);
        UTIL1.startMiniCluster(NUM_SLAVES1);
        UTIL2.startMiniCluster(1);
        connection1 = ConnectionFactory.createConnection((Configuration)CONF1);
        connection2 = ConnectionFactory.createConnection((Configuration)CONF2);
        admin = new ReplicationAdmin(CONF1);
        hbaseAdmin = connection1.getAdmin();
        TableDescriptor table = TableDescriptorBuilder.newBuilder((TableName)tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder((byte[])famName).setMaxVersions(100).setScope(1).build()).setColumnFamily(ColumnFamilyDescriptorBuilder.of((byte[])noRepfamName)).build();
        try (Admin admin1 = connection1.getAdmin();
             Admin admin2 = connection2.getAdmin();){
            admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
            admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
            UTIL1.waitUntilAllRegionsAssigned(tableName);
            htable1 = connection1.getTable(tableName);
            UTIL2.waitUntilAllRegionsAssigned(tableName);
            htable2 = connection2.getTable(tableName);
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TestReplicationBase.configureClusters(UTIL1, UTIL2);
        TestReplicationBase.startClusters();
    }

    private boolean peerExist(String peerId) throws IOException {
        return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
    }

    @Before
    public void setUpBase() throws Exception {
        if (!this.peerExist(PEER_ID2)) {
            ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).setSerial(this.isSerialPeer()).setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
            hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build());
        }
    }

    @After
    public void tearDownBase() throws Exception {
        if (this.peerExist(PEER_ID2)) {
            hbaseAdmin.removeReplicationPeer(PEER_ID2);
        }
    }

    protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
        Put put = new Put(row);
        put.addColumn(famName, row, row);
        htable1 = UTIL1.getConnection().getTable(tableName);
        htable1.put(put);
        Get get = new Get(row);
        for (int i = 0; i < 50; ++i) {
            Result res;
            if (i == 49) {
                Assert.fail((String)"Waited too much time for put replication");
            }
            if (!(res = htable2.get(get)).isEmpty()) {
                Assert.assertArrayEquals((byte[])row, (byte[])res.value());
                break;
            }
            LOG.info("Row not available");
            Thread.sleep(500L);
        }
        Delete del = new Delete(row);
        htable1.delete(del);
        get = new Get(row);
        for (int i = 0; i < 50; ++i) {
            Result res;
            if (i == 49) {
                Assert.fail((String)"Waited too much time for del replication");
            }
            if ((res = htable2.get(get)).size() < 1) break;
            LOG.info("Row not deleted");
            Thread.sleep(500L);
        }
    }

    protected static void runSmallBatchTest() throws IOException, InterruptedException {
        TestReplicationBase.loadData("", row);
        Scan scan = new Scan();
        ResultScanner scanner1 = htable1.getScanner(scan);
        Result[] res1 = scanner1.next(100);
        scanner1.close();
        Assert.assertEquals((long)100L, (long)res1.length);
        TestReplicationBase.waitForReplication(100, 50);
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        if (htable2 != null) {
            htable2.close();
        }
        if (htable1 != null) {
            htable1.close();
        }
        if (admin != null) {
            admin.close();
        }
        if (hbaseAdmin != null) {
            hbaseAdmin.close();
        }
        if (connection2 != null) {
            connection2.close();
        }
        if (connection1 != null) {
            connection1.close();
        }
        UTIL2.shutdownMiniCluster();
        UTIL1.shutdownMiniCluster();
    }

    static {
        UTIL1 = new HBaseTestingUtility();
        UTIL2 = new HBaseTestingUtility();
        CONF1 = UTIL1.getConfiguration();
        CONF2 = UTIL2.getConfiguration();
        NUM_SLAVES1 = 1;
        replicateCount = new AtomicInteger();
        replicatedEntries = Lists.newArrayList();
        tableName = TableName.valueOf((String)"test");
        famName = Bytes.toBytes((String)"f");
        row = Bytes.toBytes((String)"row");
        noRepfamName = Bytes.toBytes((String)"norep");
    }

    public static class ReplicationEndpointTest
    extends HBaseInterClusterReplicationEndpoint {
        public ReplicationEndpointTest() {
            replicateCount.set(0);
        }

        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            replicateCount.incrementAndGet();
            replicatedEntries.addAll(replicateContext.getEntries());
            return super.replicate(replicateContext);
        }
    }
}

