/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.replication;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerHandleAdapter;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.replication.ReplicationTestUtil;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestReplicationWorker
extends BookKeeperClusterTestCase {
    private static final byte[] TESTPASSWD = "testpasswd".getBytes();
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationWorker.class);
    private String basePath = "";
    private String baseLockPath = "";
    private MetadataBookieDriver driver;
    private LedgerManagerFactory mFactory;
    private LedgerUnderreplicationManager underReplicationManager;
    private static byte[] data = "TestReplicationWorker".getBytes();
    private OrderedScheduler scheduler;

    public TestReplicationWorker() {
        this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
    }

    TestReplicationWorker(String ledgerManagerFactory) {
        super(3);
        LOG.info("Running test case using ledger manager : " + ledgerManagerFactory);
        this.baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
        this.baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
        this.baseConf.setRereplicationEntryBatchSize(3L);
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath((AbstractConfiguration)this.baseClientConf);
        this.basePath = zkLedgersRootPath + '/' + "underreplication" + "/ledgers";
        this.baseLockPath = zkLedgersRootPath + '/' + "underreplication" + "/locks";
        this.scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().name("test-scheduler").numThreads(1).build();
        this.driver = MetadataDrivers.getBookieDriver((URI)URI.create(this.baseConf.getMetadataServiceUri()));
        this.driver.initialize(this.baseConf, () -> {}, (StatsLogger)NullStatsLogger.INSTANCE);
        this.mFactory = this.driver.getLedgerManagerFactory();
        this.underReplicationManager = this.mFactory.newLedgerUnderreplicationManager();
    }

    @Override
    public void tearDown() throws Exception {
        super.tearDown();
        if (null != this.underReplicationManager) {
            this.underReplicationManager.close();
            this.underReplicationManager = null;
        }
        if (null != this.driver) {
            this.driver.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldReplicateFragmentsToTargetBookie() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((ArrayList)LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        ReplicationWorker rw = new ReplicationWorker(this.zkc, this.baseConf);
        rw.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.killAllBookies(lh, newBkAddr);
            this.verifyRecoveredLedgers(lh, 0L, 9L);
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldRetryUntilThereAreEnoughBksAvailableForReplication() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((ArrayList)LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        ServerConfiguration killedBookieConfig = this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr :" + newBkAddr);
        this.killAllBookies(lh, newBkAddr);
        ReplicationWorker rw = new ReplicationWorker(this.zkc, this.baseConf);
        rw.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            int counter = 30;
            while (counter-- > 0) {
                Assert.assertTrue((String)"Expecting that replication should not complete", (boolean)ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(this.startBookie(killedBookieConfig));
            this.bsConfs.add(killedBookieConfig);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.verifyRecoveredLedgers(lh, 0L, 9L);
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void test2RWsShouldCompeteForReplicationOf2FragmentsAndCompleteReplication() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((ArrayList)LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        ServerConfiguration killedBookieConfig = this.killBookie(replicaToKill);
        this.killAllBookies(lh, null);
        BookieSocketAddress newBkAddr1 = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr1);
        ReplicationWorker rw1 = new ReplicationWorker(this.zkc, this.baseConf);
        BookieSocketAddress newBkAddr2 = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr2);
        ZooKeeperClient zkc1 = ZooKeeperClient.newBuilder().connectString(this.zkUtil.getZooKeeperConnectString()).sessionTimeoutMs(10000).build();
        ReplicationWorker rw2 = new ReplicationWorker((ZooKeeper)zkc1, this.baseConf);
        rw1.start();
        rw2.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            int counter = 10;
            while (counter-- > 0) {
                Assert.assertTrue((String)"Expecting that replication should not complete", (boolean)ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(this.startBookie(killedBookieConfig));
            this.bsConfs.add(killedBookieConfig);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.verifyRecoveredLedgers(lh, 0L, 9L);
        }
        finally {
            rw1.shutdown();
            rw2.shutdown();
            zkc1.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldCleanTheLedgerFromUnderReplicationIfLedgerAlreadyDeleted() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((ArrayList)LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        ReplicationWorker rw = new ReplicationWorker(this.zkc, this.baseConf);
        rw.start();
        try {
            this.bkc.deleteLedger(lh.getId());
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleLedgerReplicationWithReplicationWorker() throws Exception {
        LedgerHandle lh1 = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh1.addEntry(data);
        }
        BookieSocketAddress replicaToKillFromFirstLedger = (BookieSocketAddress)((ArrayList)LedgerHandleAdapter.getLedgerMetadata(lh1).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKillFromFirstLedger);
        LedgerHandle lh2 = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh2.addEntry(data);
        }
        BookieSocketAddress replicaToKillFromSecondLedger = (BookieSocketAddress)((ArrayList)LedgerHandleAdapter.getLedgerMetadata(lh2).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKillFromSecondLedger);
        this.killBookie(replicaToKillFromFirstLedger);
        lh1.close();
        this.killBookie(replicaToKillFromFirstLedger);
        lh2.close();
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        ReplicationWorker rw = new ReplicationWorker(this.zkc, this.baseConf);
        rw.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh1.getId(), replicaToKillFromFirstLedger.toString());
            this.underReplicationManager.markLedgerUnderreplicated(lh2.getId(), replicaToKillFromSecondLedger.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh1.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh2.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.killAllBookies(lh1, newBkAddr);
            this.verifyRecoveredLedgers(lh1, 0L, 9L);
            this.verifyRecoveredLedgers(lh2, 0L, 9L);
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((ArrayList)LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        this.baseConf.setOpenLedgerRereplicationGracePeriod("3000");
        ReplicationWorker rw = new ReplicationWorker(this.zkc, this.baseConf);
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
            LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
            LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
            rw.start();
            try {
                underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                this.killAllBookies(lh, newBkAddr);
                this.verifyRecoveredLedgers(lh, 0L, 9L);
                lh = this.bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
                Assert.assertFalse((String)"Ledger must have been closed by RW", (boolean)ClientUtil.isLedgerOpen(lh));
            }
            finally {
                rw.shutdown();
                underReplicationManager.close();
            }
        }
        finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((ArrayList)LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        ReplicationWorker rw = new ReplicationWorker(this.zkc, this.baseConf);
        this.baseClientConf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        MetadataClientDriver driver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            driver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
            LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
            LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
            rw.start();
            try {
                underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                this.killAllBookies(lh, newBkAddr);
                this.verifyRecoveredLedgers(lh, 0L, 9L);
                lh = this.bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
                Assert.assertTrue((String)"Ledger must have been closed by RW", (boolean)ClientUtil.isLedgerOpen(lh));
            }
            finally {
                rw.shutdown();
                underReplicationManager.close();
            }
        }
        finally {
            if (Collections.singletonList(driver).get(0) != null) {
                driver.close();
            }
        }
    }

    @Test
    public void testRWZKConnectionLost() throws Exception {
        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder().connectString(this.zkUtil.getZooKeeperConnectString()).sessionTimeoutMs(10000).build();){
            int i;
            ReplicationWorker rw = new ReplicationWorker((ZooKeeper)zk, this.baseConf);
            rw.start();
            for (i = 0; i < 10 && !rw.isRunning(); ++i) {
                Thread.sleep(1000L);
            }
            Assert.assertTrue((String)"Replication worker should be running", (boolean)rw.isRunning());
            this.stopZKCluster();
            for (i = 0; i < 10 && zk.getState().isConnected(); ++i) {
                Thread.sleep(1000L);
            }
            Assert.assertFalse((boolean)zk.getState().isConnected());
            this.startZKCluster();
            Assert.assertTrue((String)"Replication worker should still be running", (boolean)rw.isRunning());
        }
    }

    private void killAllBookies(LedgerHandle lh, BookieSocketAddress excludeBK) throws Exception {
        Set entrySet = LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().entrySet();
        for (Map.Entry entry : entrySet) {
            ArrayList bookies = (ArrayList)entry.getValue();
            for (BookieSocketAddress bookie : bookies) {
                if (bookie.equals((Object)excludeBK)) continue;
                this.killBookie(bookie);
            }
        }
    }

    private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId, long endEntryId) throws BKException, InterruptedException {
        LedgerHandle lhs = this.bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
        Enumeration entries = lhs.readEntries(startEntryId, endEntryId);
        Assert.assertTrue((String)"Should have the elements", (boolean)entries.hasMoreElements());
        while (entries.hasMoreElements()) {
            LedgerEntry entry = (LedgerEntry)entries.nextElement();
            Assert.assertEquals((Object)"TestReplicationWorker", (Object)new String(entry.getEntry()));
        }
    }
}

