/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={ReplicationTests.class, LargeTests.class})
public class TestReplicationEmptyWALRecovery
extends TestReplicationBase {
    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
    static final RegionInfo info = RegionInfoBuilder.newBuilder((TableName)tableName).build();
    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);

    @Before
    public void setUp() throws IOException, InterruptedException {
        this.cleanUp();
        this.scopes.put(famName, 1);
        replicateCount.set(0);
        replicatedEntries.clear();
    }

    private void waitForLogAdvance(final int numRs) {
        Waiter.waitFor((Configuration)CONF1, (long)100000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                for (int i = 0; i < numRs; ++i) {
                    HRegionServer hrs = TestReplicationBase.UTIL1.getHBaseCluster().getRegionServer(i);
                    RegionInfo regionInfo = TestReplicationBase.UTIL1.getHBaseCluster().getRegions(TestReplicationBase.htable1.getName()).get(0).getRegionInfo();
                    WAL wal = hrs.getWAL(regionInfo);
                    Path currentFile = ((AbstractFSWAL)wal).getCurrentFileName();
                    Replication replicationService = (Replication)TestReplicationBase.UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
                    for (ReplicationSourceInterface rsi : replicationService.getReplicationManager().getSources()) {
                        ReplicationSource source = (ReplicationSource)rsi;
                        String logPrefix = (String)source.getQueues().keySet().stream().findFirst().get();
                        if (currentFile.equals((Object)source.getCurrentPath()) && source.getQueues().keySet().size() == 1 && ((PriorityBlockingQueue)source.getQueues().get(logPrefix)).size() == 1) continue;
                        return false;
                    }
                }
                return true;
            }
        });
    }

    private void verifyNumberOfLogsInQueue(final int numQueues, final int numRs) {
        Waiter.waitFor((Configuration)CONF1, (long)10000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() {
                for (int i = 0; i < numRs; ++i) {
                    Replication replicationService = (Replication)TestReplicationBase.UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
                    for (ReplicationSourceInterface rsi : replicationService.getReplicationManager().getSources()) {
                        ReplicationSource source = (ReplicationSource)rsi;
                        String logPrefix = (String)source.getQueues().keySet().stream().findFirst().get();
                        if (((PriorityBlockingQueue)source.getQueues().get(logPrefix)).size() == numQueues) continue;
                        return false;
                    }
                }
                return true;
            }
        });
    }

    @Test
    public void testEmptyWALRecovery() throws Exception {
        int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
        ArrayList<Path> emptyWalPaths = new ArrayList<Path>();
        long ts = System.currentTimeMillis();
        for (int i = 0; i < numRs; ++i) {
            RegionInfo regionInfo = UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
            WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
            Path currentWalPath = AbstractFSWALProvider.getCurrentFileName((WAL)wal);
            String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName((String)currentWalPath.getName());
            Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
            UTIL1.getTestFileSystem().create(emptyWalPath).close();
            emptyWalPaths.add(emptyWalPath);
        }
        this.injectEmptyWAL(numRs, emptyWalPaths);
        this.waitForLogAdvance(numRs);
        this.verifyNumberOfLogsInQueue(1, numRs);
        TestReplicationEmptyWALRecovery.runSimplePutDeleteTest();
        this.rollWalsAndWaitForDeque(numRs);
    }

    @Test
    public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
        hbaseAdmin.disableReplicationPeer("2");
        int numOfEntriesToReplicate = 20;
        int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
        ArrayList<Path> emptyWalPaths = new ArrayList<Path>();
        long ts = System.currentTimeMillis();
        for (int i = 0; i < numRs; ++i) {
            RegionInfo regionInfo = UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
            WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
            Path currentWalPath = AbstractFSWALProvider.getCurrentFileName((WAL)wal);
            this.appendEntriesToWal(numOfEntriesToReplicate, wal);
            wal.rollWriter();
            String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName((String)currentWalPath.getName());
            Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
            UTIL1.getTestFileSystem().create(emptyWalPath).close();
            emptyWalPaths.add(emptyWalPath);
        }
        this.injectEmptyWAL(numRs, emptyWalPaths);
        hbaseAdmin.enableReplicationPeer("2");
        this.waitForLogAdvance(numRs);
        Assert.assertEquals((String)"Replicated entries are not correct", (long)(numOfEntriesToReplicate * numRs), (long)replicatedEntries.size());
        Assert.assertEquals((String)"Replicated batches are not correct", (long)1L, (long)replicateCount.intValue());
        this.verifyNumberOfLogsInQueue(1, numRs);
        TestReplicationEmptyWALRecovery.runSimplePutDeleteTest();
        this.rollWalsAndWaitForDeque(numRs);
    }

    @Test
    public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
        int i;
        hbaseAdmin.disableReplicationPeer("2");
        int numOfEntriesToReplicate = 20;
        int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
        ArrayList<Path> emptyWalPaths = new ArrayList<Path>();
        long ts = System.currentTimeMillis();
        WAL wal = null;
        for (i = 0; i < numRs; ++i) {
            RegionInfo regionInfo = UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
            wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
            Path currentWalPath = AbstractFSWALProvider.getCurrentFileName((WAL)wal);
            this.appendEntriesToWal(numOfEntriesToReplicate, wal);
            String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName((String)currentWalPath.getName());
            Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
            UTIL1.getTestFileSystem().create(emptyWalPath).close();
            emptyWalPaths.add(emptyWalPath);
        }
        this.injectEmptyWAL(numRs, emptyWalPaths);
        for (i = 0; i < numRs; ++i) {
            wal.rollWriter();
        }
        hbaseAdmin.enableReplicationPeer("2");
        this.waitForLogAdvance(numRs);
        Assert.assertEquals((String)"Replicated entries are not correct", (long)(numOfEntriesToReplicate * numRs), (long)replicatedEntries.size());
        Assert.assertEquals((String)"Replicated batches are not correct", (long)1L, (long)replicateCount.get());
        this.verifyNumberOfLogsInQueue(1, numRs);
        TestReplicationEmptyWALRecovery.runSimplePutDeleteTest();
        this.rollWalsAndWaitForDeque(numRs);
    }

    @Test
    public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
        int i;
        hbaseAdmin.disableReplicationPeer("2");
        int numOfEntriesToReplicate = 20;
        int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
        ArrayList<Path> emptyWalPaths = new ArrayList<Path>();
        long ts = System.currentTimeMillis();
        WAL wal = null;
        for (i = 0; i < numRs; ++i) {
            RegionInfo regionInfo = UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
            wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
            Path currentWalPath = AbstractFSWALProvider.getCurrentFileName((WAL)wal);
            this.appendEntriesToWal(numOfEntriesToReplicate, wal);
            wal.rollWriter();
            String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName((String)currentWalPath.getName());
            Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
            UTIL1.getTestFileSystem().create(emptyWalPath).close();
            emptyWalPaths.add(emptyWalPath);
        }
        this.injectEmptyWAL(numRs, emptyWalPaths);
        for (i = 0; i < numRs; ++i) {
            this.appendEntriesToWal(numOfEntriesToReplicate, wal);
            wal.rollWriter();
        }
        hbaseAdmin.enableReplicationPeer("2");
        this.waitForLogAdvance(numRs);
        Assert.assertEquals((String)"Replicated entries are not correct", (long)(numOfEntriesToReplicate * numRs * 2), (long)replicatedEntries.size());
        Assert.assertEquals((String)"Replicated batches are not correct", (long)2L, (long)replicateCount.get());
        this.verifyNumberOfLogsInQueue(1, numRs);
        TestReplicationEmptyWALRecovery.runSimplePutDeleteTest();
        this.rollWalsAndWaitForDeque(numRs);
    }

    private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
        for (int i = 0; i < numRs; ++i) {
            HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
            Replication replicationService = (Replication)hrs.getReplicationSourceService();
            replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
            replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
            RegionInfo regionInfo = UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
            WAL wal = hrs.getWAL(regionInfo);
            wal.rollWriter(true);
        }
    }

    protected WALKeyImpl getWalKeyImpl() {
        return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0L, this.mvcc, this.scopes);
    }

    private void rollWalsAndWaitForDeque(int numRs) throws IOException {
        RegionInfo regionInfo = UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
        for (int i = 0; i < numRs; ++i) {
            WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
            wal.rollWriter();
        }
        this.waitForLogAdvance(numRs);
    }

    private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
        long txId = -1L;
        for (int i = 0; i < numEntries; ++i) {
            byte[] b = Bytes.toBytes((String)Integer.toString(i));
            KeyValue kv = new KeyValue(b, famName, b);
            WALEdit edit = new WALEdit();
            edit.add((Cell)kv);
            txId = wal.appendData(info, this.getWalKeyImpl(), edit);
        }
        wal.sync(txId);
    }
}

