/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.replication;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.replication.ReplicationTestUtil;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestReplicationWorker
extends BookKeeperClusterTestCase {
    private static final byte[] TESTPASSWD = "testpasswd".getBytes();
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationWorker.class);
    private String basePath = "";
    private String baseLockPath = "";
    private MetadataBookieDriver driver;
    private LedgerManagerFactory mFactory;
    private LedgerUnderreplicationManager underReplicationManager;
    private LedgerManager ledgerManager;
    private static byte[] data = "TestReplicationWorker".getBytes();
    private OrderedScheduler scheduler;
    private String zkLedgersRootPath;

    public TestReplicationWorker() {
        this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
    }

    TestReplicationWorker(String ledgerManagerFactory) {
        super(3);
        LOG.info("Running test case using ledger manager : " + ledgerManagerFactory);
        this.baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
        this.baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
        this.baseConf.setRereplicationEntryBatchSize(3L);
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        this.zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath((AbstractConfiguration)this.baseClientConf);
        this.basePath = this.zkLedgersRootPath + '/' + "underreplication" + "/ledgers";
        this.baseLockPath = this.zkLedgersRootPath + '/' + "underreplication" + "/locks";
        this.scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().name("test-scheduler").numThreads(1).build();
        this.driver = MetadataDrivers.getBookieDriver((URI)URI.create(this.baseConf.getMetadataServiceUri()));
        this.driver.initialize(this.baseConf, () -> {}, (StatsLogger)NullStatsLogger.INSTANCE);
        this.mFactory = this.driver.getLedgerManagerFactory();
        this.ledgerManager = this.mFactory.newLedgerManager();
        this.underReplicationManager = this.mFactory.newLedgerUnderreplicationManager();
    }

    @Override
    public void tearDown() throws Exception {
        super.tearDown();
        if (null != this.underReplicationManager) {
            this.underReplicationManager.close();
            this.underReplicationManager = null;
        }
        if (null != this.driver) {
            this.driver.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldReplicateFragmentsToTargetBookie() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        rw.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.killAllBookies(lh, newBkAddr);
            this.verifyRecoveredLedgers(lh, 0L, 9L);
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldRetryUntilThereAreEnoughBksAvailableForReplication() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", (Object)replicaToKill);
        ServerConfiguration killedBookieConfig = this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr :" + newBkAddr);
        this.killAllBookies(lh, newBkAddr);
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        rw.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            int counter = 30;
            while (counter-- > 0) {
                Assert.assertTrue((String)"Expecting that replication should not complete", (boolean)ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(this.startBookie(killedBookieConfig));
            this.bsConfs.add(killedBookieConfig);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.verifyRecoveredLedgers(lh, 0L, 9L);
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void test2RWsShouldCompeteForReplicationOf2FragmentsAndCompleteReplication() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", (Object)replicaToKill);
        ServerConfiguration killedBookieConfig = this.killBookie(replicaToKill);
        this.killAllBookies(lh, null);
        BookieSocketAddress newBkAddr1 = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr1);
        ReplicationWorker rw1 = new ReplicationWorker(this.baseConf);
        BookieSocketAddress newBkAddr2 = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr2);
        ReplicationWorker rw2 = new ReplicationWorker(this.baseConf);
        rw1.start();
        rw2.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            int counter = 10;
            while (counter-- > 0) {
                Assert.assertTrue((String)"Expecting that replication should not complete", (boolean)ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(this.startBookie(killedBookieConfig));
            this.bsConfs.add(killedBookieConfig);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.verifyRecoveredLedgers(lh, 0L, 9L);
        }
        finally {
            rw1.shutdown();
            rw2.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldCleanTheLedgerFromUnderReplicationIfLedgerAlreadyDeleted() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        rw.start();
        try {
            this.bkc.deleteLedger(lh.getId());
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleLedgerReplicationWithReplicationWorker() throws Exception {
        LedgerHandle lh1 = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh1.addEntry(data);
        }
        BookieSocketAddress replicaToKillFromFirstLedger = (BookieSocketAddress)((List)lh1.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", (Object)replicaToKillFromFirstLedger);
        LedgerHandle lh2 = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh2.addEntry(data);
        }
        BookieSocketAddress replicaToKillFromSecondLedger = (BookieSocketAddress)((List)lh2.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", (Object)replicaToKillFromSecondLedger);
        this.killBookie(replicaToKillFromFirstLedger);
        lh1.close();
        this.killBookie(replicaToKillFromFirstLedger);
        lh2.close();
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        rw.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh1.getId(), replicaToKillFromFirstLedger.toString());
            this.underReplicationManager.markLedgerUnderreplicated(lh2.getId(), replicaToKillFromSecondLedger.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh1.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh2.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.killAllBookies(lh1, newBkAddr);
            this.verifyRecoveredLedgers(lh1, 0L, 9L);
            this.verifyRecoveredLedgers(lh2, 0L, 9L);
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        this.baseConf.setOpenLedgerRereplicationGracePeriod("3000");
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
            LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
            LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
            rw.start();
            try {
                underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                this.killAllBookies(lh, newBkAddr);
                this.verifyRecoveredLedgers(lh, 0L, 9L);
                lh = this.bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
                Assert.assertFalse((String)"Ledger must have been closed by RW", (boolean)ClientUtil.isLedgerOpen(lh));
            }
            finally {
                rw.shutdown();
                underReplicationManager.close();
            }
        }
        finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBookiesNotAvailableScenarioForReplicationWorker() throws Exception {
        int i;
        int ensembleSize = 3;
        LedgerHandle lh = this.bkc.createLedger(ensembleSize, ensembleSize, BookKeeper.DigestType.CRC32, TESTPASSWD);
        int numOfEntries = 7;
        for (int i2 = 0; i2 < numOfEntries; ++i2) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress[] bookiesKilled = new BookieSocketAddress[ensembleSize];
        final ServerConfiguration[] killedBookiesConfig = new ServerConfiguration[ensembleSize];
        for (i = 0; i < ensembleSize; ++i) {
            bookiesKilled[i] = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(i);
            killedBookiesConfig[i] = this.getBkConf(bookiesKilled[i]);
            LOG.info("Killing Bookie : {}", (Object)bookiesKilled[i]);
            this.killBookie(bookiesKilled[i]);
        }
        for (i = 0; i < ensembleSize; ++i) {
            BookieSocketAddress bookieSocketAddress = this.startNewBookieAndReturnAddress();
        }
        ServerConfiguration newRWConf = new ServerConfiguration((AbstractConfiguration)this.baseConf);
        newRWConf.setLockReleaseOfFailedLedgerGracePeriod("64");
        ReplicationWorker rw1 = new ReplicationWorker(newRWConf);
        ReplicationWorker rw2 = new ReplicationWorker(newRWConf);
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
            LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
            LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
            try {
                for (int i3 = 0; i3 < bookiesKilled.length; ++i3) {
                    underReplicationManager.markLedgerUnderreplicated(lh.getId(), bookiesKilled[i3].toString());
                }
                while (!ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                rw1.start();
                rw2.start();
                final AtomicBoolean isBookieRestarted = new AtomicBoolean(false);
                new Thread(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            Thread.sleep(3000L);
                            isBookieRestarted.set(true);
                            TestReplicationWorker.this.startBookie(killedBookiesConfig[0]);
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
                int rw1PrevFailedAttemptsCount = 0;
                int rw2PrevFailedAttemptsCount = 0;
                while (!isBookieRestarted.get()) {
                    Assert.assertTrue((String)("Ledger: " + lh.getId() + " should be underreplicated"), (boolean)ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath));
                    int rw1CurFailedAttemptsCount = ((AtomicInteger)rw1.replicationFailedLedgers.get((Object)lh.getId())).get();
                    Assert.assertTrue((String)("The current number of failed attempts: " + rw1CurFailedAttemptsCount + " should be greater than or equal to previous value: " + rw1PrevFailedAttemptsCount), (rw1CurFailedAttemptsCount >= rw1PrevFailedAttemptsCount ? 1 : 0) != 0);
                    rw1PrevFailedAttemptsCount = rw1CurFailedAttemptsCount;
                    int rw2CurFailedAttemptsCount = ((AtomicInteger)rw2.replicationFailedLedgers.get((Object)lh.getId())).get();
                    Assert.assertTrue((String)("The current number of failed attempts: " + rw2CurFailedAttemptsCount + " should be greater than or equal to previous value: " + rw2PrevFailedAttemptsCount), (rw2CurFailedAttemptsCount >= rw2PrevFailedAttemptsCount ? 1 : 0) != 0);
                    rw2PrevFailedAttemptsCount = rw2CurFailedAttemptsCount;
                    Thread.sleep(50L);
                }
                int timeToWaitForReplicationToComplete = 20000;
                int timeWaited = 0;
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                    if ((timeWaited += 100) != timeToWaitForReplicationToComplete) continue;
                    Assert.fail((String)"Ledger should be replicated by now");
                }
                rw1PrevFailedAttemptsCount = ((AtomicInteger)rw1.replicationFailedLedgers.get((Object)lh.getId())).get();
                rw2PrevFailedAttemptsCount = ((AtomicInteger)rw2.replicationFailedLedgers.get((Object)lh.getId())).get();
                Thread.sleep(2000L);
                Assert.assertEquals((String)"rw1 failedattempts", (long)rw1PrevFailedAttemptsCount, (long)((AtomicInteger)rw1.replicationFailedLedgers.get((Object)lh.getId())).get());
                Assert.assertEquals((String)"rw2 failed attempts ", (long)rw2PrevFailedAttemptsCount, (long)((AtomicInteger)rw2.replicationFailedLedgers.get((Object)lh.getId())).get());
                int rw1UnableToReadEntriesForReplication = ((ConcurrentSkipListSet)rw1.unableToReadEntriesForReplication.get((Object)lh.getId())).size();
                int rw2UnableToReadEntriesForReplication = ((ConcurrentSkipListSet)rw2.unableToReadEntriesForReplication.get((Object)lh.getId())).size();
                Assert.assertTrue((String)("unableToReadEntriesForReplication in RW1: " + rw1UnableToReadEntriesForReplication + " in RW2: " + rw2UnableToReadEntriesForReplication), (rw1UnableToReadEntriesForReplication == 0 || rw2UnableToReadEntriesForReplication == 0 ? 1 : 0) != 0);
            }
            finally {
                rw1.shutdown();
                rw2.shutdown();
                underReplicationManager.close();
            }
        }
        finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeferLedgerLockReleaseForReplicationWorker() throws Exception {
        int i;
        int ensembleSize = 3;
        LedgerHandle lh = this.bkc.createLedger(ensembleSize, ensembleSize, BookKeeper.DigestType.CRC32, TESTPASSWD);
        int numOfEntries = 7;
        for (int i2 = 0; i2 < numOfEntries; ++i2) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress[] bookiesKilled = new BookieSocketAddress[ensembleSize];
        ServerConfiguration[] killedBookiesConfig = new ServerConfiguration[ensembleSize];
        for (i = 0; i < ensembleSize; ++i) {
            bookiesKilled[i] = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(i);
            killedBookiesConfig[i] = this.getBkConf(bookiesKilled[i]);
            LOG.info("Killing Bookie : {}", (Object)bookiesKilled[i]);
            this.killBookie(bookiesKilled[i]);
        }
        for (i = 0; i < ensembleSize; ++i) {
            this.startNewBookieAndReturnAddress();
        }
        long lockReleaseOfFailedLedgerGracePeriod = 64L;
        long baseBackoffForLockReleaseOfFailedLedger = lockReleaseOfFailedLedgerGracePeriod / (long)((int)Math.pow(2.0, 5.0));
        ServerConfiguration newRWConf = new ServerConfiguration((AbstractConfiguration)this.baseConf);
        newRWConf.setLockReleaseOfFailedLedgerGracePeriod(Long.toString(lockReleaseOfFailedLedgerGracePeriod));
        newRWConf.setRereplicationEntryBatchSize(1000L);
        CopyOnWriteArrayList<Long> rw1DelayReplicationPeriods = new CopyOnWriteArrayList<Long>();
        CopyOnWriteArrayList<Long> rw2DelayReplicationPeriods = new CopyOnWriteArrayList<Long>();
        TestStatsProvider statsProvider = new TestStatsProvider();
        TestStatsProvider.TestStatsLogger statsLogger1 = statsProvider.getStatsLogger("rw1");
        TestStatsProvider.TestStatsLogger statsLogger2 = statsProvider.getStatsLogger("rw2");
        InjectedReplicationWorker rw1 = new InjectedReplicationWorker(newRWConf, (StatsLogger)statsLogger1, rw1DelayReplicationPeriods);
        InjectedReplicationWorker rw2 = new InjectedReplicationWorker(newRWConf, (StatsLogger)statsLogger2, rw2DelayReplicationPeriods);
        Counter numEntriesUnableToReadForReplication1 = statsLogger1.getCounter("NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION");
        Counter numEntriesUnableToReadForReplication2 = statsLogger2.getCounter("NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION");
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
            LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
            LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
            try {
                for (int i3 = 0; i3 < bookiesKilled.length; ++i3) {
                    underReplicationManager.markLedgerUnderreplicated(lh.getId(), bookiesKilled[i3].toString());
                }
                while (!ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                rw1.start();
                rw2.start();
                int numOfAttemptsToWaitFor = 10;
                while (((AtomicInteger)rw1.replicationFailedLedgers.get((Object)lh.getId())).get() < numOfAttemptsToWaitFor || ((AtomicInteger)rw2.replicationFailedLedgers.get((Object)lh.getId())).get() < numOfAttemptsToWaitFor) {
                    Thread.sleep(500L);
                }
                Assert.assertTrue((String)("Ledger: " + lh.getId() + " should be underreplicated"), (boolean)ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath));
                for (int i4 = 0; i4 < numOfAttemptsToWaitFor - 1; ++i4) {
                    long expectedDelayValue = Math.min(lockReleaseOfFailedLedgerGracePeriod, baseBackoffForLockReleaseOfFailedLedger * (long)(1 << i4));
                    Assert.assertEquals((String)"RW1 delayperiod", (Object)expectedDelayValue, (Object)rw1DelayReplicationPeriods.get(i4));
                    Assert.assertEquals((String)"RW2 delayperiod", (Object)expectedDelayValue, (Object)rw2DelayReplicationPeriods.get(i4));
                }
                Assert.assertEquals((String)"numEntriesUnableToReadForReplication for RW1", (Object)numOfEntries, (Object)numEntriesUnableToReadForReplication1.get());
                Assert.assertEquals((String)"numEntriesUnableToReadForReplication for RW2", (Object)numOfEntries, (Object)numEntriesUnableToReadForReplication2.get());
                Assert.assertEquals((String)"RW1 unabletoreadentries", (long)numOfEntries, (long)((ConcurrentSkipListSet)rw1.unableToReadEntriesForReplication.get((Object)lh.getId())).size());
                Assert.assertEquals((String)"RW2 unabletoreadentries", (long)numOfEntries, (long)((ConcurrentSkipListSet)rw2.unableToReadEntriesForReplication.get((Object)lh.getId())).size());
            }
            finally {
                rw1.shutdown();
                rw2.shutdown();
                underReplicationManager.close();
            }
        }
        finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        this.baseClientConf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        MetadataClientDriver driver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            driver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
            LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
            LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
            rw.start();
            try {
                underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                this.killAllBookies(lh, newBkAddr);
                this.verifyRecoveredLedgers(lh, 0L, 9L);
                lh = this.bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
                Assert.assertTrue((String)"Ledger must have been closed by RW", (boolean)ClientUtil.isLedgerOpen(lh));
            }
            finally {
                rw.shutdown();
                underReplicationManager.close();
            }
        }
        finally {
            if (Collections.singletonList(driver).get(0) != null) {
                driver.close();
            }
        }
    }

    @Test
    public void testRWZKConnectionLost() throws Exception {
        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder().connectString(this.zkUtil.getZooKeeperConnectString()).sessionTimeoutMs(10000).build();){
            int i;
            ReplicationWorker rw = new ReplicationWorker(this.baseConf);
            rw.start();
            for (i = 0; i < 10 && !rw.isRunning(); ++i) {
                Thread.sleep(1000L);
            }
            Assert.assertTrue((String)"Replication worker should be running", (boolean)rw.isRunning());
            this.stopZKCluster();
            for (i = 0; i < 10 && zk.getState().isConnected(); ++i) {
                Thread.sleep(1000L);
            }
            Assert.assertFalse((boolean)zk.getState().isConnected());
            this.startZKCluster();
            Assert.assertTrue((String)"Replication worker should still be running", (boolean)rw.isRunning());
        }
    }

    private void killAllBookies(LedgerHandle lh, BookieSocketAddress excludeBK) throws Exception {
        for (Map.Entry entry : lh.getLedgerMetadata().getAllEnsembles().entrySet()) {
            List bookies = (List)entry.getValue();
            for (BookieSocketAddress bookie : bookies) {
                if (bookie.equals((Object)excludeBK)) continue;
                this.killBookie(bookie);
            }
        }
    }

    private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId, long endEntryId) throws BKException, InterruptedException {
        LedgerHandle lhs = this.bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
        Enumeration entries = lhs.readEntries(startEntryId, endEntryId);
        Assert.assertTrue((String)"Should have the elements", (boolean)entries.hasMoreElements());
        while (entries.hasMoreElements()) {
            LedgerEntry entry = (LedgerEntry)entries.nextElement();
            Assert.assertEquals((Object)"TestReplicationWorker", (Object)new String(entry.getEntry()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception {
        int i;
        int zkSessionTimeOut = 10000;
        ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, (StatsLogger)NullStatsLogger.INSTANCE);
        MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(this.zkUtil.getZooKeeperConnectString(), zkSessionTimeOut, zooKeeperWatcherBase);
        zkFaultInjectionWrapper.waitForConnection();
        Assert.assertEquals((String)"zkFaultInjectionWrapper should be in connected state", (Object)ZooKeeper.States.CONNECTED, (Object)zkFaultInjectionWrapper.getState());
        long oldZkInstanceSessionId = zkFaultInjectionWrapper.getSessionId();
        BookKeeper bkWithMockZK = new BookKeeper(this.baseClientConf, (ZooKeeper)zkFaultInjectionWrapper);
        long ledgerId = 567L;
        LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD, null);
        for (i = 0; i < 10; ++i) {
            lh.addEntry((long)i, data);
        }
        lh.close();
        zooKeeperWatcherBase.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, ""));
        zkFaultInjectionWrapper.waitForConnection();
        for (i = 0; i < 10 && zkFaultInjectionWrapper.getState() != ZooKeeper.States.CONNECTED; ++i) {
            Thread.sleep(200L);
        }
        Assert.assertEquals((String)"zkFaultInjectionWrapper should be in connected state", (Object)ZooKeeper.States.CONNECTED, (Object)zkFaultInjectionWrapper.getState());
        Assert.assertNotEquals((String)"Session Id of old and new ZK instance should be different", (long)oldZkInstanceSessionId, (long)zkFaultInjectionWrapper.getSessionId());
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        ReplicationWorker rw = new ReplicationWorker(this.baseConf, bkWithMockZK, false, (StatsLogger)NullStatsLogger.INSTANCE);
        rw.start();
        try {
            for (int i2 = 0; i2 < 40 && !rw.isRunning(); ++i2) {
                LOG.info("Waiting for the RW to start...");
                Thread.sleep(500L);
            }
            Assert.assertTrue((String)"RW should be running", (boolean)rw.isRunning());
            AbstractZkLedgerManager absZKLedgerManager = (AbstractZkLedgerManager)this.ledgerManager;
            String ledgerPath = absZKLedgerManager.getLedgerPath(ledgerId);
            String urLockPath = ZkLedgerUnderreplicationManager.getUrLedgerLockZnode((String)ZkLedgerUnderreplicationManager.getUrLockPath((String)this.zkLedgersRootPath), (long)ledgerId);
            zkFaultInjectionWrapper.setPathOfSetDataToFail(ledgerPath);
            zkFaultInjectionWrapper.setPathOfDeleteToFail(urLockPath);
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            for (int i3 = 0; i3 < 40 && rw.isRunning(); ++i3) {
                LOG.info("Waiting for the RW to shutdown...");
                Thread.sleep(500L);
            }
            Assert.assertEquals((String)"NumOfTimesSetDataFailed", (long)1L, (long)zkFaultInjectionWrapper.getNumOfTimesSetDataFailed());
            Assert.assertEquals((String)"NumOfTimesDeleteFailed", (long)2L, (long)zkFaultInjectionWrapper.getNumOfTimesDeleteFailed());
            Assert.assertFalse((String)"RW should be shutdown", (boolean)rw.isRunning());
        }
        finally {
            rw.shutdown();
            zkFaultInjectionWrapper.close();
            bkWithMockZK.close();
        }
    }

    class MockZooKeeperClient
    extends ZooKeeperClient {
        private final String connectString;
        private final int sessionTimeoutMs;
        private final ZooKeeperWatcherBase watcherManager;
        private volatile String pathOfSetDataToFail;
        private volatile String pathOfDeleteToFail;
        private AtomicInteger numOfTimesSetDataFailed;
        private AtomicInteger numOfTimesDeleteFailed;

        MockZooKeeperClient(String connectString, int sessionTimeoutMs, ZooKeeperWatcherBase watcher) throws IOException {
            super(connectString, sessionTimeoutMs, watcher, (RetryPolicy)new BoundExponentialBackoffRetryPolicy((long)sessionTimeoutMs, (long)sessionTimeoutMs, Integer.MAX_VALUE), (RetryPolicy)new BoundExponentialBackoffRetryPolicy((long)sessionTimeoutMs, (long)sessionTimeoutMs, 0), (StatsLogger)NullStatsLogger.INSTANCE, 1, 0.0, false);
            this.numOfTimesSetDataFailed = new AtomicInteger();
            this.numOfTimesDeleteFailed = new AtomicInteger();
            this.connectString = connectString;
            this.sessionTimeoutMs = sessionTimeoutMs;
            this.watcherManager = watcher;
        }

        protected ZooKeeper createZooKeeper() throws IOException {
            return new MockZooKeeper(this.connectString, this.sessionTimeoutMs, (Watcher)this.watcherManager, false);
        }

        private void setPathOfSetDataToFail(String pathOfSetDataToFail) {
            this.pathOfSetDataToFail = pathOfSetDataToFail;
        }

        private void setPathOfDeleteToFail(String pathOfDeleteToFail) {
            this.pathOfDeleteToFail = pathOfDeleteToFail;
        }

        private int getNumOfTimesSetDataFailed() {
            return this.numOfTimesSetDataFailed.get();
        }

        private int getNumOfTimesDeleteFailed() {
            return this.numOfTimesDeleteFailed.get();
        }

        class MockZooKeeper
        extends ZooKeeper {
            public MockZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
                super(connectString, sessionTimeout, watcher, canBeReadOnly);
            }

            public void setData(String path, byte[] data, int version, AsyncCallback.StatCallback cb, Object context) {
                if (MockZooKeeperClient.this.pathOfSetDataToFail != null && MockZooKeeperClient.this.pathOfSetDataToFail.equals(path)) {
                    LOG.error("setData of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", (Object)path);
                    MockZooKeeperClient.this.numOfTimesSetDataFailed.incrementAndGet();
                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, context, null);
                } else {
                    super.setData(path, data, version, cb, context);
                }
            }

            public void delete(String path, int version) throws KeeperException, InterruptedException {
                if (MockZooKeeperClient.this.pathOfDeleteToFail != null && MockZooKeeperClient.this.pathOfDeleteToFail.equals(path)) {
                    LOG.error("delete of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", (Object)path);
                    MockZooKeeperClient.this.numOfTimesDeleteFailed.incrementAndGet();
                    throw new KeeperException.ConnectionLossException();
                }
                super.delete(path, version);
            }
        }
    }

    class InjectedReplicationWorker
    extends ReplicationWorker {
        CopyOnWriteArrayList<Long> delayReplicationPeriods;

        public InjectedReplicationWorker(ServerConfiguration conf, StatsLogger statsLogger, CopyOnWriteArrayList<Long> delayReplicationPeriods) throws ReplicationException.CompatibilityException, KeeperException, InterruptedException, IOException {
            super(conf, statsLogger);
            this.delayReplicationPeriods = delayReplicationPeriods;
        }

        void scheduleTaskWithDelay(TimerTask timerTask, long delayPeriod) {
            this.delayReplicationPeriods.add(delayPeriod);
            super.scheduleTaskWithDelay(timerTask, delayPeriod);
        }
    }
}

