/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.replication;

import java.net.URI;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.replication.ReplicationTestUtil;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestReplicationWorker
extends BookKeeperClusterTestCase {
    private static final byte[] TESTPASSWD = "testpasswd".getBytes();
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationWorker.class);
    private String basePath = "";
    private String baseLockPath = "";
    private MetadataBookieDriver driver;
    private LedgerManagerFactory mFactory;
    private LedgerUnderreplicationManager underReplicationManager;
    private static byte[] data = "TestReplicationWorker".getBytes();
    private OrderedScheduler scheduler;

    public TestReplicationWorker() {
        this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
    }

    TestReplicationWorker(String ledgerManagerFactory) {
        super(3);
        LOG.info("Running test case using ledger manager : " + ledgerManagerFactory);
        this.baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
        this.baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
        this.baseConf.setRereplicationEntryBatchSize(3L);
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath((AbstractConfiguration)this.baseClientConf);
        this.basePath = zkLedgersRootPath + '/' + "underreplication" + "/ledgers";
        this.baseLockPath = zkLedgersRootPath + '/' + "underreplication" + "/locks";
        this.scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().name("test-scheduler").numThreads(1).build();
        this.driver = MetadataDrivers.getBookieDriver((URI)URI.create(this.baseConf.getMetadataServiceUri()));
        this.driver.initialize(this.baseConf, () -> {}, (StatsLogger)NullStatsLogger.INSTANCE);
        this.mFactory = this.driver.getLedgerManagerFactory();
        this.underReplicationManager = this.mFactory.newLedgerUnderreplicationManager();
    }

    @Override
    public void tearDown() throws Exception {
        super.tearDown();
        if (null != this.underReplicationManager) {
            this.underReplicationManager.close();
            this.underReplicationManager = null;
        }
        if (null != this.driver) {
            this.driver.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldReplicateFragmentsToTargetBookie() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        rw.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.killAllBookies(lh, newBkAddr);
            this.verifyRecoveredLedgers(lh, 0L, 9L);
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldRetryUntilThereAreEnoughBksAvailableForReplication() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        ServerConfiguration killedBookieConfig = this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr :" + newBkAddr);
        this.killAllBookies(lh, newBkAddr);
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        rw.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            int counter = 30;
            while (counter-- > 0) {
                Assert.assertTrue((String)"Expecting that replication should not complete", (boolean)ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(this.startBookie(killedBookieConfig));
            this.bsConfs.add(killedBookieConfig);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.verifyRecoveredLedgers(lh, 0L, 9L);
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void test2RWsShouldCompeteForReplicationOf2FragmentsAndCompleteReplication() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        ServerConfiguration killedBookieConfig = this.killBookie(replicaToKill);
        this.killAllBookies(lh, null);
        BookieSocketAddress newBkAddr1 = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr1);
        ReplicationWorker rw1 = new ReplicationWorker(this.baseConf);
        BookieSocketAddress newBkAddr2 = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr2);
        ReplicationWorker rw2 = new ReplicationWorker(this.baseConf);
        rw1.start();
        rw2.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            int counter = 10;
            while (counter-- > 0) {
                Assert.assertTrue((String)"Expecting that replication should not complete", (boolean)ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(this.startBookie(killedBookieConfig));
            this.bsConfs.add(killedBookieConfig);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.verifyRecoveredLedgers(lh, 0L, 9L);
        }
        finally {
            rw1.shutdown();
            rw2.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldCleanTheLedgerFromUnderReplicationIfLedgerAlreadyDeleted() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        rw.start();
        try {
            this.bkc.deleteLedger(lh.getId());
            this.underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleLedgerReplicationWithReplicationWorker() throws Exception {
        LedgerHandle lh1 = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh1.addEntry(data);
        }
        BookieSocketAddress replicaToKillFromFirstLedger = (BookieSocketAddress)((List)lh1.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKillFromFirstLedger);
        LedgerHandle lh2 = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh2.addEntry(data);
        }
        BookieSocketAddress replicaToKillFromSecondLedger = (BookieSocketAddress)((List)lh2.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKillFromSecondLedger);
        this.killBookie(replicaToKillFromFirstLedger);
        lh1.close();
        this.killBookie(replicaToKillFromFirstLedger);
        lh2.close();
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        rw.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(lh1.getId(), replicaToKillFromFirstLedger.toString());
            this.underReplicationManager.markLedgerUnderreplicated(lh2.getId(), replicaToKillFromSecondLedger.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh1.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh2.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            this.killAllBookies(lh1, newBkAddr);
            this.verifyRecoveredLedgers(lh1, 0L, 9L);
            this.verifyRecoveredLedgers(lh2, 0L, 9L);
        }
        finally {
            rw.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        this.baseConf.setOpenLedgerRereplicationGracePeriod("3000");
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
            LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
            LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
            rw.start();
            try {
                underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                this.killAllBookies(lh, newBkAddr);
                this.verifyRecoveredLedgers(lh, 0L, 9L);
                lh = this.bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
                Assert.assertFalse((String)"Ledger must have been closed by RW", (boolean)ClientUtil.isLedgerOpen(lh));
            }
            finally {
                rw.shutdown();
                underReplicationManager.close();
            }
        }
        finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBookiesNotAvailableScenarioForReplicationWorker() throws Exception {
        int i;
        int ensembleSize = 3;
        LedgerHandle lh = this.bkc.createLedger(ensembleSize, ensembleSize, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i2 = 0; i2 < 10; ++i2) {
            lh.addEntry(data);
        }
        lh.close();
        BookieSocketAddress[] bookiesKilled = new BookieSocketAddress[ensembleSize];
        final ServerConfiguration[] killedBookiesConfig = new ServerConfiguration[ensembleSize];
        for (i = 0; i < ensembleSize; ++i) {
            bookiesKilled[i] = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(i);
            killedBookiesConfig[i] = this.getBkConf(bookiesKilled[i]);
            LOG.info("Killing Bookie", (Object)bookiesKilled[i]);
            this.killBookie(bookiesKilled[i]);
        }
        for (i = 0; i < ensembleSize; ++i) {
            BookieSocketAddress bookieSocketAddress = this.startNewBookieAndReturnAddress();
        }
        this.baseConf.setLockReleaseOfFailedLedgerGracePeriod("500");
        ReplicationWorker rw1 = new ReplicationWorker(this.baseConf);
        ReplicationWorker rw2 = new ReplicationWorker(this.baseConf);
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
            LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
            LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
            try {
                for (int i3 = 0; i3 < bookiesKilled.length; ++i3) {
                    underReplicationManager.markLedgerUnderreplicated(lh.getId(), bookiesKilled[i3].toString());
                }
                while (!ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                rw1.start();
                rw2.start();
                final AtomicBoolean isBookieRestarted = new AtomicBoolean(false);
                new Thread(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            Thread.sleep(4000L);
                            isBookieRestarted.set(true);
                            TestReplicationWorker.this.startBookie(killedBookiesConfig[0]);
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
                while (!isBookieRestarted.get()) {
                    Assert.assertTrue((String)("Ledger: " + lh.getId() + " should be underreplicated"), (boolean)ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath));
                    int failedAttempts = ((AtomicInteger)rw1.replicationFailedLedgers.get((Object)lh.getId())).get();
                    Assert.assertTrue((String)("The number of failed attempts should be less than ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, but it is " + failedAttempts), (failedAttempts <= 10 ? 1 : 0) != 0);
                    failedAttempts = ((AtomicInteger)rw2.replicationFailedLedgers.get((Object)lh.getId())).get();
                    Assert.assertTrue((String)("The number of failed attempts should be less than ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, but it is " + failedAttempts), (failedAttempts <= 10 ? 1 : 0) != 0);
                    Thread.sleep(50L);
                }
                int timeToWaitForReplicationToComplete = 2000;
                int timeWaited = 0;
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                    if ((timeWaited += 100) != timeToWaitForReplicationToComplete) continue;
                    Assert.fail((String)"Ledger should be replicated by now");
                }
            }
            finally {
                rw1.shutdown();
                rw2.shutdown();
                underReplicationManager.close();
            }
        }
        finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        BookieSocketAddress replicaToKill = (BookieSocketAddress)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", (Object)replicaToKill);
        this.killBookie(replicaToKill);
        BookieSocketAddress newBkAddr = this.startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", (Object)newBkAddr);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(data);
        }
        ReplicationWorker rw = new ReplicationWorker(this.baseConf);
        this.baseClientConf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        MetadataClientDriver driver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            driver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
            LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
            LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
            rw.start();
            try {
                underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, lh.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                this.killAllBookies(lh, newBkAddr);
                this.verifyRecoveredLedgers(lh, 0L, 9L);
                lh = this.bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
                Assert.assertTrue((String)"Ledger must have been closed by RW", (boolean)ClientUtil.isLedgerOpen(lh));
            }
            finally {
                rw.shutdown();
                underReplicationManager.close();
            }
        }
        finally {
            if (Collections.singletonList(driver).get(0) != null) {
                driver.close();
            }
        }
    }

    @Test
    public void testRWZKConnectionLost() throws Exception {
        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder().connectString(this.zkUtil.getZooKeeperConnectString()).sessionTimeoutMs(10000).build();){
            int i;
            ReplicationWorker rw = new ReplicationWorker(this.baseConf);
            rw.start();
            for (i = 0; i < 10 && !rw.isRunning(); ++i) {
                Thread.sleep(1000L);
            }
            Assert.assertTrue((String)"Replication worker should be running", (boolean)rw.isRunning());
            this.stopZKCluster();
            for (i = 0; i < 10 && zk.getState().isConnected(); ++i) {
                Thread.sleep(1000L);
            }
            Assert.assertFalse((boolean)zk.getState().isConnected());
            this.startZKCluster();
            Assert.assertTrue((String)"Replication worker should still be running", (boolean)rw.isRunning());
        }
    }

    private void killAllBookies(LedgerHandle lh, BookieSocketAddress excludeBK) throws Exception {
        for (Map.Entry entry : lh.getLedgerMetadata().getAllEnsembles().entrySet()) {
            List bookies = (List)entry.getValue();
            for (BookieSocketAddress bookie : bookies) {
                if (bookie.equals((Object)excludeBK)) continue;
                this.killBookie(bookie);
            }
        }
    }

    private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId, long endEntryId) throws BKException, InterruptedException {
        LedgerHandle lhs = this.bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
        Enumeration entries = lhs.readEntries(startEntryId, endEntryId);
        Assert.assertTrue((String)"Should have the elements", (boolean)entries.hasMoreElements());
        while (entries.hasMoreElements()) {
            LedgerEntry entry = (LedgerEntry)entries.nextElement();
            Assert.assertEquals((Object)"TestReplicationWorker", (Object)new String(entry.getEntry()));
        }
    }
}

