/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.commons.configuration.Configuration;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class TestReadLastConfirmedAndEntry
extends BookKeeperClusterTestCase {
    private static final Logger logger = LoggerFactory.getLogger(TestReadLastConfirmedAndEntry.class);
    final BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;

    public TestReadLastConfirmedAndEntry(Class<? extends LedgerStorage> storageClass) {
        super(3);
        this.baseConf.setAllowEphemeralPorts(false);
        this.baseConf.setLedgerStorageClass(storageClass.getName());
    }

    @Parameterized.Parameters
    public static Collection<Object[]> configs() {
        return Arrays.asList({InterleavedLedgerStorage.class}, {SortedLedgerStorage.class}, {DbLedgerStorage.class});
    }

    @Test
    public void testAdvancedLacWithEmptyResponse() throws Exception {
        byte[] passwd = "advanced-lac-with-empty-response".getBytes(Charsets.UTF_8);
        ClientConfiguration newConf = new ClientConfiguration();
        newConf.addConfiguration((Configuration)this.baseClientConf);
        newConf.setAddEntryTimeout(9999999);
        newConf.setReadEntryTimeout(9999999);
        this.stopAllBookies();
        long expectedEntryIdToFail = 2L;
        for (int i = 0; i < this.numBookies; ++i) {
            ServerConfiguration conf = this.newServerConfiguration();
            FakeBookie b = new FakeBookie(conf, expectedEntryIdToFail, i != 0);
            this.bs.add(this.startBookie(conf, b));
            this.bsConfs.add(conf);
        }
        BookKeeper newBk = new BookKeeper(newConf);
        LedgerHandle lh = newBk.createLedger(3, 3, 2, this.digestType, passwd);
        int i = 0;
        while ((long)i <= expectedEntryIdToFail) {
            lh.addEntry("test".getBytes(Charsets.UTF_8));
            ++i;
        }
        LedgerHandle newLh = newBk.openLedgerNoRecovery(lh.getId(), this.digestType, passwd);
        long lac = newLh.readLastConfirmed();
        Assert.assertEquals((long)(expectedEntryIdToFail - 1L), (long)lac);
        Enumeration entries = newLh.readEntries(0L, lac);
        int numReads = 0;
        long expectedEntryId = 0L;
        while (entries.hasMoreElements()) {
            LedgerEntry entry = (LedgerEntry)entries.nextElement();
            Assert.assertEquals((long)expectedEntryId++, (long)entry.getEntryId());
            ++numReads;
        }
        Assert.assertEquals((long)(lac + 1L), (long)numReads);
        final AtomicInteger rcHolder = new AtomicInteger(-12345);
        final AtomicLong lacHolder = new AtomicLong(lac);
        final AtomicReference<Object> entryHolder = new AtomicReference<Object>(null);
        final CountDownLatch latch = new CountDownLatch(1);
        newLh.asyncReadLastConfirmedAndEntry(newLh.getLastAddConfirmed() + 1L, 99999L, false, new AsyncCallback.ReadLastConfirmedAndEntryCallback(){

            public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
                rcHolder.set(rc);
                lacHolder.set(lastConfirmed);
                entryHolder.set(entry);
                latch.countDown();
            }
        }, null);
        lh.addEntry("another test".getBytes(Charsets.UTF_8));
        latch.await();
        Assert.assertEquals((long)expectedEntryIdToFail, (long)lacHolder.get());
        Assert.assertNull(entryHolder.get());
        Assert.assertEquals((long)0L, (long)rcHolder.get());
    }

    @Test
    public void testRaceOnLastAddConfirmed() throws Exception {
        byte[] passwd = "race-on-last-add-confirmed".getBytes(Charsets.UTF_8);
        ClientConfiguration newConf = new ClientConfiguration();
        newConf.addConfiguration((Configuration)this.baseClientConf);
        newConf.setAddEntryTimeout(9999999);
        newConf.setReadEntryTimeout(9999999);
        long lacToSlowRead = 0L;
        CountDownLatch readLatch = new CountDownLatch(1);
        ServerConfiguration bsConf = this.killBookie(0);
        SlowReadLacBookie b = new SlowReadLacBookie(bsConf, 0L, readLatch);
        this.bs.add(this.startBookie(bsConf, b));
        this.bsConfs.add(bsConf);
        BookKeeper newBk = new BookKeeper(newConf);
        LedgerHandle lh = newBk.createLedger(3, 3, 3, this.digestType, passwd);
        lh.addEntry("entry-0".getBytes(Charsets.UTF_8));
        LedgerHandle readLh = newBk.openLedgerNoRecovery(lh.getId(), this.digestType, passwd);
        ReadLastConfirmedAndEntryResult readResult = new ReadLastConfirmedAndEntryResult();
        readLh.asyncReadLastConfirmedAndEntry(0L, 9999999L, true, (AsyncCallback.ReadLastConfirmedAndEntryCallback)readResult, null);
        lh.addEntry("entry-1".getBytes(Charsets.UTF_8));
        readResult.await();
        Assert.assertEquals((long)0L, (long)readResult.rc);
        Assert.assertEquals((long)0L, (long)readResult.lac);
        Assert.assertEquals((long)0L, (long)readResult.entry.getEntryId());
        Assert.assertEquals((Object)"entry-0", (Object)new String(readResult.entry.getEntry(), Charsets.UTF_8));
        lh.addEntry("entry-2".getBytes(Charsets.UTF_8));
        readLatch.countDown();
        while (readLh.getLastAddConfirmed() < 1L) {
            Thread.sleep(100L);
        }
        lh.addEntry("entry-3".getBytes(Charsets.UTF_8));
        readResult = new ReadLastConfirmedAndEntryResult();
        readLh.asyncReadLastConfirmedAndEntry(1L, 9999999L, true, (AsyncCallback.ReadLastConfirmedAndEntryCallback)readResult, null);
        readResult.await();
        Assert.assertEquals((long)0L, (long)readResult.rc);
        Assert.assertEquals((long)2L, (long)readResult.lac);
        Assert.assertEquals((long)1L, (long)readResult.entry.getEntryId());
        Assert.assertEquals((Object)"entry-1", (Object)new String(readResult.entry.getEntry(), Charsets.UTF_8));
        lh.close();
        readLh.close();
        newBk.close();
    }

    static class ReadLastConfirmedAndEntryResult
    implements AsyncCallback.ReadLastConfirmedAndEntryCallback {
        int rc = -1234;
        long lac = -1234L;
        LedgerEntry entry = null;
        final CountDownLatch doneLatch = new CountDownLatch(1);

        ReadLastConfirmedAndEntryResult() {
        }

        public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
            this.rc = rc;
            this.lac = lastConfirmed;
            this.entry = entry;
            this.doneLatch.countDown();
        }

        void await() throws InterruptedException {
            this.doneLatch.await();
        }
    }

    static class SlowReadLacBookie
    extends Bookie {
        private final long lacToSlowRead;
        private final CountDownLatch readLatch;

        public SlowReadLacBookie(ServerConfiguration conf, long lacToSlowRead, CountDownLatch readLatch) throws IOException, KeeperException, InterruptedException, BookieException {
            super(conf);
            this.lacToSlowRead = lacToSlowRead;
            this.readLatch = readLatch;
        }

        public long readLastAddConfirmed(long ledgerId) throws IOException {
            long lac = super.readLastAddConfirmed(ledgerId);
            logger.info("Last Add Confirmed for ledger {} is {}", (Object)ledgerId, (Object)lac);
            if (this.lacToSlowRead == lac) {
                logger.info("Suspend returning lac {} for ledger {}", (Object)lac, (Object)ledgerId);
                try {
                    this.readLatch.await();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return super.readLastAddConfirmed(ledgerId);
        }
    }

    static class FakeBookie
    extends Bookie {
        final long expectedEntryToFail;
        final boolean stallOrRespondNull;

        public FakeBookie(ServerConfiguration conf, long expectedEntryToFail, boolean stallOrRespondNull) throws InterruptedException, BookieException, KeeperException, IOException {
            super(conf);
            this.expectedEntryToFail = expectedEntryToFail;
            this.stallOrRespondNull = stallOrRespondNull;
        }

        public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, Bookie.NoLedgerException {
            if (entryId == this.expectedEntryToFail) {
                if (this.stallOrRespondNull) {
                    try {
                        Thread.sleep(600000L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    throw new Bookie.NoEntryException(ledgerId, entryId);
                }
            }
            return super.readEntry(ledgerId, entryId);
        }
    }
}

