/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookKeeperCloseTest
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(BookKeeperCloseTest.class);
    private BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
    private static final String PASSWORD = "testPasswd";

    public BookKeeperCloseTest() {
        super(3);
    }

    private void restartBookieSlow() throws Exception {
        ServerConfiguration conf = this.killBookie(0);
        Bookie delayBookie = new Bookie(conf){

            public void recoveryAddEntry(ByteBuf entry, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException {
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
                super.recoveryAddEntry(entry, cb, ctx, masterKey);
            }

            public void addEntry(ByteBuf entry, boolean ackBeforeSync, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException {
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
                super.addEntry(entry, ackBeforeSync, cb, ctx, masterKey);
            }

            public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, Bookie.NoLedgerException {
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
                return super.readEntry(ledgerId, entryId);
            }
        };
        this.bsConfs.add(conf);
        this.bs.add(this.startBookie(conf, delayBookie));
    }

    @Test
    public void testCreateLedger() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf, this.zkc);
        LOG.info("Closing bookkeeper client");
        bk.close();
        try {
            bk.createLedger(this.digestType, PASSWORD.getBytes());
            Assert.fail((String)"should have failed, client is closed");
        }
        catch (BKException.BKClientClosedException bKClientClosedException) {
            // empty catch block
        }
        final AtomicInteger returnCode = new AtomicInteger(0);
        final CountDownLatch openLatch = new CountDownLatch(1);
        AsyncCallback.CreateCallback cb = new AsyncCallback.CreateCallback(){

            public void createComplete(int rc, LedgerHandle lh, Object ctx) {
                returnCode.set(rc);
                openLatch.countDown();
            }
        };
        bk.asyncCreateLedger(3, 2, this.digestType, PASSWORD.getBytes(), cb, (Object)openLatch);
        LOG.info("Waiting to finish the ledger creation");
        Assert.assertTrue((String)"create ledger call should have completed", (boolean)openLatch.await(20L, TimeUnit.SECONDS));
        Assert.assertEquals((String)"Succesfully created ledger through closed bkclient!", (long)-19L, (long)returnCode.get());
    }

    @Test
    public void testFenceLedger() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf, this.zkc);
        LOG.info("Create ledger and add entries to it");
        LedgerHandle lh = this.createLedgerWithEntries(bk, 100);
        LOG.info("Closing bookkeeper client");
        this.restartBookieSlow();
        bk.close();
        try {
            bk.openLedger(lh.getId(), this.digestType, PASSWORD.getBytes());
            Assert.fail((String)"should have failed, client is closed");
        }
        catch (BKException.BKClientClosedException bKClientClosedException) {
            // empty catch block
        }
        try {
            bk.openLedgerNoRecovery(lh.getId(), this.digestType, PASSWORD.getBytes());
            Assert.fail((String)"should have failed, client is closed");
        }
        catch (BKException.BKClientClosedException bKClientClosedException) {
            // empty catch block
        }
        final AtomicInteger returnCode = new AtomicInteger(0);
        final CountDownLatch openLatch = new CountDownLatch(1);
        AsyncCallback.OpenCallback cb = new AsyncCallback.OpenCallback(){

            public void openComplete(int rc, LedgerHandle lh, Object ctx) {
                returnCode.set(rc);
                openLatch.countDown();
            }
        };
        bk.asyncOpenLedger(lh.getId(), this.digestType, PASSWORD.getBytes(), cb, (Object)openLatch);
        LOG.info("Waiting to open the ledger asynchronously");
        Assert.assertTrue((String)"Open call should have completed", (boolean)openLatch.await(20L, TimeUnit.SECONDS));
        Assert.assertTrue((String)"Open should not have succeeded through closed bkclient!", (-19 == returnCode.get() ? 1 : 0) != 0);
    }

    @Test
    public void testDeleteLedger() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf, this.zkc);
        LOG.info("Create ledger and add entries to it");
        LedgerHandle lh = this.createLedgerWithEntries(bk, 100);
        LOG.info("Closing bookkeeper client");
        bk.close();
        try {
            bk.deleteLedger(lh.getId());
            Assert.fail((String)"should have failed, client is closed");
        }
        catch (BKException.BKClientClosedException bKClientClosedException) {
            // empty catch block
        }
        final AtomicInteger returnCode = new AtomicInteger(0);
        final CountDownLatch openLatch = new CountDownLatch(1);
        AsyncCallback.DeleteCallback cb = new AsyncCallback.DeleteCallback(){

            public void deleteComplete(int rc, Object ctx) {
                returnCode.set(rc);
                openLatch.countDown();
            }
        };
        bk.asyncDeleteLedger(lh.getId(), cb, (Object)openLatch);
        LOG.info("Waiting to delete the ledger asynchronously");
        Assert.assertTrue((String)"Delete call should have completed", (boolean)openLatch.await(20L, TimeUnit.SECONDS));
        Assert.assertEquals((String)"Delete should not have succeeded through closed bkclient!", (long)-19L, (long)returnCode.get());
    }

    @Test
    public void testAddLedgerEntry() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf, this.zkc);
        LOG.info("Create ledger and add entries to it");
        LedgerHandle lh = this.createLedgerWithEntries(bk, 1);
        LOG.info("Closing bookkeeper client");
        this.restartBookieSlow();
        bk.close();
        try {
            lh.addEntry("foobar".getBytes());
            Assert.fail((String)"should have failed, client is closed");
        }
        catch (BKException.BKClientClosedException bKClientClosedException) {
            // empty catch block
        }
        final CountDownLatch completeLatch = new CountDownLatch(1);
        final AtomicInteger rc = new AtomicInteger(0);
        lh.asyncAddEntry("foobar".getBytes(), new AsyncCallback.AddCallback(){

            public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx) {
                rc.set(rccb);
                completeLatch.countDown();
            }
        }, null);
        LOG.info("Waiting to finish adding another entry asynchronously");
        Assert.assertTrue((String)"Add entry to ledger call should have completed", (boolean)completeLatch.await(20L, TimeUnit.SECONDS));
        Assert.assertEquals((String)"Add entry to ledger should not have succeeded through closed bkclient!", (long)-19L, (long)rc.get());
    }

    @Test
    public void testCloseLedger() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf, this.zkc);
        LOG.info("Create ledger and add entries to it");
        LedgerHandle lh = this.createLedgerWithEntries(bk, 100);
        LedgerHandle lh2 = this.createLedgerWithEntries(bk, 100);
        LOG.info("Closing bookkeeper client");
        bk.close();
        try {
            lh.close();
            Assert.fail((String)"should have failed, client is closed");
        }
        catch (BKException.BKClientClosedException bKClientClosedException) {
            // empty catch block
        }
        final CountDownLatch completeLatch = new CountDownLatch(1);
        final AtomicInteger rc = new AtomicInteger(0);
        lh2.asyncClose(new AsyncCallback.CloseCallback(){

            public void closeComplete(int rccb, LedgerHandle lh, Object ctx) {
                rc.set(rccb);
                completeLatch.countDown();
            }
        }, null);
        LOG.info("Waiting to finish adding another entry asynchronously");
        Assert.assertTrue((String)"Close ledger call should have completed", (boolean)completeLatch.await(20L, TimeUnit.SECONDS));
        Assert.assertEquals((String)"Close ledger should have succeeded through closed bkclient!", (long)-19L, (long)rc.get());
    }

    @Test
    public void testReadLedgerEntry() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf, this.zkc);
        LOG.info("Create ledger and add entries to it");
        int numOfEntries = 100;
        LedgerHandle lh = this.createLedgerWithEntries(bk, numOfEntries);
        LOG.info("Closing bookkeeper client");
        this.restartBookieSlow();
        bk.close();
        try {
            lh.readEntries(0L, (long)(numOfEntries - 1));
            Assert.fail((String)"should have failed, client is closed");
        }
        catch (BKException.BKClientClosedException bKClientClosedException) {
            // empty catch block
        }
        final CountDownLatch readLatch = new CountDownLatch(1);
        final AtomicInteger rc = new AtomicInteger(0);
        AsyncCallback.ReadCallback cb = new AsyncCallback.ReadCallback(){

            public void readComplete(int rccb, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                rc.set(rccb);
                readLatch.countDown();
            }
        };
        lh.asyncReadEntries(0L, (long)(numOfEntries - 1), cb, (Object)readLatch);
        LOG.info("Waiting to finish reading the entries asynchronously");
        Assert.assertTrue((String)"Read entry ledger call should have completed", (boolean)readLatch.await(20L, TimeUnit.SECONDS));
        Assert.assertEquals((String)"Read entry ledger should have succeeded through closed bkclient!", (long)-19L, (long)rc.get());
    }

    @Test
    public void testReadLastConfirmed() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf, this.zkc);
        LOG.info("Create ledger and add entries to it");
        LedgerHandle lh = this.createLedgerWithEntries(bk, 100);
        LOG.info("Closing bookkeeper client");
        this.restartBookieSlow();
        this.restartBookieSlow();
        this.restartBookieSlow();
        bk.close();
        final CountDownLatch readLatch = new CountDownLatch(1);
        final AtomicInteger rc = new AtomicInteger(0);
        AsyncCallback.ReadLastConfirmedCallback cb = new AsyncCallback.ReadLastConfirmedCallback(){

            public void readLastConfirmedComplete(int rccb, long lastConfirmed, Object ctx) {
                rc.set(rccb);
                readLatch.countDown();
            }
        };
        lh.asyncReadLastConfirmed(cb, (Object)readLatch);
        LOG.info("Waiting to finish reading last confirmed entry asynchronously");
        Assert.assertTrue((String)"ReadLastConfirmed call should have completed", (boolean)readLatch.await(20L, TimeUnit.SECONDS));
        Assert.assertEquals((String)"ReadLastConfirmed should have succeeded through closed bkclient!", (long)-19L, (long)rc.get());
        try {
            lh.readLastConfirmed();
            Assert.fail((String)"should have failed, client is closed");
        }
        catch (BKException.BKClientClosedException bKClientClosedException) {
            // empty catch block
        }
    }

    @Test
    public void testLedgerCheck() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf, this.zkc);
        LOG.info("Create ledger and add entries to it");
        LedgerHandle lh = this.createLedgerWithEntries(bk, 100);
        LOG.info("Closing bookkeeper client");
        LedgerChecker lc = new LedgerChecker(bk);
        this.restartBookieSlow();
        bk.close();
        final CountDownLatch postLatch = new CountDownLatch(1);
        final AtomicInteger postRc = new AtomicInteger(0);
        lc.checkLedger(lh, (BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>>(){

            public void operationComplete(int rc, Set<LedgerFragment> result) {
                postRc.set(rc);
                postLatch.countDown();
            }
        });
        Assert.assertTrue((String)"checkLedger should have finished", (boolean)postLatch.await(30L, TimeUnit.SECONDS));
        Assert.assertEquals((String)"Should have client closed exception", (long)postRc.get(), (long)-19L);
    }

    @Test
    public void testBookKeeperAdmin() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf, this.zkc);
        try (BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk);){
            LOG.info("Create ledger and add entries to it");
            LedgerHandle lh1 = this.createLedgerWithEntries(bk, 100);
            LedgerHandle lh2 = this.createLedgerWithEntries(bk, 100);
            LedgerHandle lh3 = this.createLedgerWithEntries(bk, 100);
            lh3.close();
            BookieSocketAddress bookieToKill = this.getBookie(0);
            this.killBookie(bookieToKill);
            this.startNewBookie();
            CheckerCb checkercb = new CheckerCb();
            LedgerChecker lc = new LedgerChecker(bk);
            lc.checkLedger(lh3, (BookkeeperInternalCallbacks.GenericCallback)checkercb);
            Assert.assertEquals((String)"Should have completed", (long)checkercb.getRc(30, TimeUnit.SECONDS), (long)0L);
            Assert.assertEquals((String)"Should have a missing fragment", (long)1L, (long)checkercb.getResult(30, TimeUnit.SECONDS).size());
            this.restartBookieSlow();
            this.restartBookieSlow();
            bk.close();
            try {
                bkadmin.openLedger(lh1.getId());
                Assert.fail((String)"Shouldn't be able to open with a closed client");
            }
            catch (BKException.BKClientClosedException bKClientClosedException) {
                // empty catch block
            }
            try {
                bkadmin.openLedgerNoRecovery(lh1.getId());
                Assert.fail((String)"Shouldn't be able to open with a closed client");
            }
            catch (BKException.BKClientClosedException bKClientClosedException) {
                // empty catch block
            }
            try {
                bkadmin.recoverBookieData(bookieToKill);
                Assert.fail((String)"Shouldn't be able to recover with a closed client");
            }
            catch (BKException.BKClientClosedException bKClientClosedException) {
                // empty catch block
            }
            try {
                bkadmin.replicateLedgerFragment(lh3, checkercb.getResult(10, TimeUnit.SECONDS).iterator().next());
                Assert.fail((String)"Shouldn't be able to replicate with a closed client");
            }
            catch (BKException.BKClientClosedException bKClientClosedException) {
                // empty catch block
            }
        }
    }

    @Test
    public void testBookKeeperCloseThreads() throws Exception {
        ThreadGroup group = new ThreadGroup("test-group");
        final SettableFuture future = SettableFuture.create();
        Thread t = new Thread(group, "TestThread"){

            @Override
            public void run() {
                try {
                    BookKeeper bk = new BookKeeper(BookKeeperCloseTest.this.baseClientConf);
                    LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.CRC32, "passwd".getBytes());
                    lh.addEntry("foobar".getBytes());
                    lh.close();
                    long id = lh.getId();
                    lh = bk.openLedgerNoRecovery(id, BookKeeper.DigestType.CRC32, "passwd".getBytes());
                    Enumeration entries = lh.readEntries(0L, 0L);
                    lh.close();
                    bk.close();
                    future.set(null);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    future.setException((Throwable)ie);
                }
                catch (Exception e) {
                    future.setException((Throwable)e);
                }
            }
        };
        t.start();
        future.get();
        t.join();
        for (int i = 0; i < 10 && group.activeCount() > 0; ++i) {
            Thread[] threads = new Thread[group.activeCount()];
            group.enumerate(threads);
            for (Thread leftover : threads) {
                LOG.error("Leftover thread after {} secs: {}", (Object)i, (Object)leftover);
            }
            Thread.sleep(1000L);
        }
        Assert.assertEquals((String)"Should be no threads left in group", (long)0L, (long)group.activeCount());
    }

    private LedgerHandle createLedgerWithEntries(BookKeeper bk, int numOfEntries) throws Exception {
        LedgerHandle lh = bk.createLedger(3, 3, this.digestType, PASSWORD.getBytes());
        final AtomicInteger rc = new AtomicInteger(0);
        final CountDownLatch latch = new CountDownLatch(numOfEntries);
        AsyncCallback.AddCallback cb = new AsyncCallback.AddCallback(){

            public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx) {
                rc.compareAndSet(0, rccb);
                latch.countDown();
            }
        };
        for (int i = 0; i < numOfEntries; ++i) {
            lh.asyncAddEntry("foobar".getBytes(), cb, null);
        }
        if (!latch.await(30L, TimeUnit.SECONDS)) {
            throw new Exception("Entries took too long to add");
        }
        if (rc.get() != 0) {
            throw BKException.create((int)rc.get());
        }
        return lh;
    }

    private static class CheckerCb
    implements BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>> {
        CountDownLatch latch = new CountDownLatch(1);
        int rc = 0;
        Set<LedgerFragment> result = null;

        private CheckerCb() {
        }

        public void operationComplete(int rc, Set<LedgerFragment> result) {
            this.rc = rc;
            this.result = result;
            this.latch.countDown();
        }

        int getRc(int time, TimeUnit unit) throws Exception {
            if (this.latch.await(time, unit)) {
                return this.rc;
            }
            throw new Exception("Didn't complete");
        }

        Set<LedgerFragment> getResult(int time, TimeUnit unit) throws Exception {
            if (this.latch.await(time, unit)) {
                return this.result;
            }
            throw new Exception("Didn't complete");
        }
    }
}

