/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.replication;

import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieAccessor;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.IndexPersistenceMgr;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerHandleAdapter;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.replication.Auditor;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AuditorPeriodicCheckTest
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(AuditorPeriodicCheckTest.class);
    private MetadataBookieDriver driver;
    private HashMap<String, AuditorElector> auditorElectors = new HashMap();
    private List<ZooKeeper> zkClients = new LinkedList<ZooKeeper>();
    private static final int CHECK_INTERVAL = 1;

    public AuditorPeriodicCheckTest() {
        super(3);
        this.baseConf.setPageLimit(1);
    }

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        for (int i = 0; i < this.numBookies; ++i) {
            ServerConfiguration conf = new ServerConfiguration((AbstractConfiguration)this.bsConfs.get(i));
            conf.setAuditorPeriodicCheckInterval(1L);
            String addr = ((BookieServer)this.bs.get(i)).getLocalAddress().toString();
            ZooKeeperClient zk = ZooKeeperClient.newBuilder().connectString(this.zkUtil.getZooKeeperConnectString()).sessionTimeoutMs(10000).build();
            this.zkClients.add((ZooKeeper)zk);
            AuditorElector auditorElector = new AuditorElector(addr, conf, (ZooKeeper)zk);
            this.auditorElectors.put(addr, auditorElector);
            auditorElector.start();
            LOG.debug("Starting Auditor Elector");
        }
        this.driver = MetadataDrivers.getBookieDriver((URI)URI.create(((ServerConfiguration)this.bsConfs.get(0)).getMetadataServiceUri()));
        this.driver.initialize((ServerConfiguration)this.bsConfs.get(0), () -> {}, (StatsLogger)NullStatsLogger.INSTANCE);
    }

    @Override
    @After
    public void tearDown() throws Exception {
        if (null != this.driver) {
            this.driver.close();
        }
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        for (ZooKeeper zk : this.zkClients) {
            zk.close();
        }
        this.zkClients.clear();
        super.tearDown();
    }

    @Test
    public void testEntryLogCorruption() throws Exception {
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
        underReplicationManager.disableLedgerReplication();
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
        long ledgerId = lh.getId();
        for (int i = 0; i < 100; ++i) {
            lh.addEntry("testdata".getBytes());
        }
        lh.close();
        BookieAccessor.forceFlush(((BookieServer)this.bs.get(0)).getBookie());
        File ledgerDir = ((ServerConfiguration)this.bsConfs.get(0)).getLedgerDirs()[0];
        ledgerDir = Bookie.getCurrentDirectory((File)ledgerDir);
        File[] entryLogs = ledgerDir.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return name.endsWith(".log");
            }
        });
        ByteBuffer junk = ByteBuffer.allocate(0x100000);
        for (File f : entryLogs) {
            FileOutputStream out = new FileOutputStream(f);
            out.getChannel().write(junk);
            out.close();
        }
        this.restartBookies();
        underReplicationManager.enableLedgerReplication();
        long underReplicatedLedger = -1L;
        for (int i = 0; i < 10 && (underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate()) == -1L; ++i) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((String)"Ledger should be under replicated", (long)ledgerId, (long)underReplicatedLedger);
        underReplicationManager.close();
    }

    @Test
    public void testIndexCorruption() throws Exception {
        int i;
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
        long ledgerToCorrupt = lh.getId();
        for (i = 0; i < 100; ++i) {
            lh.addEntry("testdata".getBytes());
        }
        lh.close();
        lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
        for (i = 0; i < 100; ++i) {
            lh.addEntry("testdata".getBytes());
        }
        lh.close();
        BookieAccessor.forceFlush(((BookieServer)this.bs.get(0)).getBookie());
        File ledgerDir = ((ServerConfiguration)this.bsConfs.get(0)).getLedgerDirs()[0];
        ledgerDir = Bookie.getCurrentDirectory((File)ledgerDir);
        File index = new File(ledgerDir, IndexPersistenceMgr.getLedgerName((long)ledgerToCorrupt));
        LOG.info("file to corrupt{}", (Object)index);
        ByteBuffer junk = ByteBuffer.allocate(0x100000);
        FileOutputStream out = new FileOutputStream(index);
        out.getChannel().write(junk);
        out.close();
        long underReplicatedLedger = -1L;
        for (int i2 = 0; i2 < 10 && (underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate()) == -1L; ++i2) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((String)"Ledger should be under replicated", (long)ledgerToCorrupt, (long)underReplicatedLedger);
        underReplicationManager.close();
    }

    @Test
    public void testPeriodicCheckWhenDisabled() throws Exception {
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
        int numLedgers = 10;
        int numMsgs = 2;
        final CountDownLatch completeLatch = new CountDownLatch(20);
        final AtomicInteger rc = new AtomicInteger(0);
        ArrayList<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
            lhs.add(lh);
            for (int j = 0; j < 2; ++j) {
                lh.asyncAddEntry("testdata".getBytes(), new AsyncCallback.AddCallback(){

                    public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) {
                        if (rc.compareAndSet(0, rc2)) {
                            LOG.info("Failed to add entry : {}", (Object)BKException.getMessage((int)rc2));
                        }
                        completeLatch.countDown();
                    }
                }, null);
            }
        }
        completeLatch.await();
        if (rc.get() != 0) {
            throw BKException.create((int)rc.get());
        }
        for (LedgerHandle lh : lhs) {
            lh.close();
        }
        underReplicationManager.disableLedgerReplication();
        final AtomicInteger numReads = new AtomicInteger(0);
        ServerConfiguration conf = this.killBookie(0);
        Bookie deadBookie = new Bookie(conf){

            public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, Bookie.NoLedgerException {
                numReads.incrementAndGet();
                throw new IOException("Fake I/O exception");
            }
        };
        this.bsConfs.add(conf);
        this.bs.add(this.startBookie(conf, deadBookie));
        Thread.sleep(2000L);
        Assert.assertEquals((String)"Nothing should have tried to read", (long)0L, (long)numReads.get());
        underReplicationManager.enableLedgerReplication();
        Thread.sleep(2000L);
        underReplicationManager.disableLedgerReplication();
        Thread.sleep(2000L);
        int numUnderreplicated = 0;
        long underReplicatedLedger = -1L;
        while ((underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate()) != -1L) {
            ++numUnderreplicated;
            underReplicationManager.markLedgerReplicated(underReplicatedLedger);
            if (underReplicatedLedger != -1L) continue;
        }
        Thread.sleep(2000L);
        underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
        Assert.assertEquals((String)"There should be no underreplicated ledgers", (long)-1L, (long)underReplicatedLedger);
        LOG.info("{} of {} ledgers underreplicated", (Object)numUnderreplicated, (Object)numUnderreplicated);
        Assert.assertTrue((String)"All should be underreplicated", (numUnderreplicated <= 10 && numUnderreplicated > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testPeriodicCheckWhenLedgerDeleted() throws Exception {
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        int numLedgers = 10;
        LinkedList<Long> ids = new LinkedList<Long>();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
            ids.add(lh.getId());
            for (int j = 0; j < 2; ++j) {
                lh.addEntry("testdata".getBytes());
            }
            lh.close();
        }
        final Auditor auditor = new Auditor(Bookie.getBookieAddress((ServerConfiguration)((ServerConfiguration)this.bsConfs.get(0))).toString(), (ServerConfiguration)this.bsConfs.get(0), this.zkc, (StatsLogger)NullStatsLogger.INSTANCE);
        final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
        final CountDownLatch latch = new CountDownLatch(1);
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    latch.countDown();
                    for (int i = 0; i < 10; ++i) {
                        auditor.checkAllLedgers();
                    }
                }
                catch (Exception e) {
                    LOG.error("Caught exception while checking all ledgers", (Throwable)e);
                    exceptionCaught.set(true);
                }
            }
        };
        t.start();
        latch.await();
        for (Long id : ids) {
            this.bkc.deleteLedger(id);
        }
        t.join();
        Assert.assertFalse((String)"Shouldn't have thrown exception", (boolean)exceptionCaught.get());
    }

    private BookieSocketAddress replaceBookieWithWriteFailingBookie(LedgerHandle lh) throws Exception {
        int bookieIdx = -1;
        Long entryId = (Long)LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().firstKey();
        ArrayList curEnsemble = (ArrayList)LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(entryId);
        BookieSocketAddress replacedBookie = null;
        for (int i = 0; i < this.numBookies; ++i) {
            if (!curEnsemble.contains(((BookieServer)this.bs.get(i)).getLocalAddress())) continue;
            bookieIdx = i;
            replacedBookie = ((BookieServer)this.bs.get(i)).getLocalAddress();
            break;
        }
        Assert.assertNotEquals((String)"Couldn't find ensemble bookie in bookie list", (long)-1L, (long)bookieIdx);
        LOG.info("Killing bookie " + ((BookieServer)this.bs.get(bookieIdx)).getLocalAddress());
        ServerConfiguration conf = this.killBookie(bookieIdx);
        Bookie writeFailingBookie = new Bookie(conf){

            public void addEntry(ByteBuf entry, boolean ackBeforeSync, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException {
                try {
                    LOG.info("Failing write to entry ");
                    Thread.sleep(100L);
                    throw new IOException();
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        };
        this.bsConfs.add(conf);
        this.bs.add(this.startBookie(conf, writeFailingBookie));
        return replacedBookie;
    }

    @Test
    public void testFailedWriteRecovery() throws Exception {
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
        underReplicationManager.disableLedgerReplication();
        LedgerHandle lh = this.bkc.createLedger(2, 2, 1, BookKeeper.DigestType.CRC32, "passwd".getBytes());
        BookieSocketAddress replacedBookie = this.replaceBookieWithWriteFailingBookie(lh);
        byte[] data = "foobar".getBytes();
        data = "foobar".getBytes();
        lh.addEntry(data);
        lh.addEntry(data);
        lh.addEntry(data);
        lh.close();
        underReplicationManager.enableLedgerReplication();
        long underReplicatedLedger = -1L;
        for (int i = 0; i < 5 && (underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate()) == -1L; ++i) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((String)"Ledger should be under replicated", (long)lh.getId(), (long)underReplicatedLedger);
        ArrayList<ReplicationWorker> l = new ArrayList<ReplicationWorker>();
        for (int i = 0; i < this.numBookies; ++i) {
            ReplicationWorker rw = new ReplicationWorker(this.zkc, (ServerConfiguration)this.bsConfs.get(i), (StatsLogger)NullStatsLogger.INSTANCE);
            rw.start();
            l.add(rw);
        }
        underReplicationManager.close();
        Thread.sleep(3000L);
        for (ReplicationWorker rw : l) {
            rw.shutdown();
        }
        LedgerHandle newLh = this.bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, "passwd".getBytes());
        for (Map.Entry e : LedgerHandleAdapter.getLedgerMetadata(newLh).getEnsembles().entrySet()) {
            ArrayList ensemble = (ArrayList)e.getValue();
            Assert.assertFalse((String)"Ensemble hasn't been updated", (boolean)ensemble.contains(replacedBookie));
        }
        newLh.close();
    }
}

