/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadataUtils;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class TestReadLastConfirmedLongPoll
extends BookKeeperClusterTestCase {
    private static final Logger log = LoggerFactory.getLogger(TestReadLastConfirmedLongPoll.class);
    final BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;

    public TestReadLastConfirmedLongPoll(Class<? extends LedgerStorage> storageClass) {
        super(6);
        this.baseConf.setLedgerStorageClass(storageClass.getName());
    }

    @Parameterized.Parameters
    public static Collection<Object[]> configs() {
        return Arrays.asList({InterleavedLedgerStorage.class}, {SortedLedgerStorage.class}, {DbLedgerStorage.class});
    }

    @Test
    public void testReadLACLongPollWhenAllBookiesUp() throws Exception {
        int numEntries = 3;
        LedgerHandle lh = this.bkc.createLedger(3, 3, 1, this.digestType, "".getBytes());
        LedgerHandle readLh = this.bkc.openLedgerNoRecovery(lh.getId(), this.digestType, "".getBytes());
        Assert.assertEquals((long)-1L, (long)readLh.getLastAddConfirmed());
        for (int i = 0; i < 2; ++i) {
            lh.addEntry(("data" + i).getBytes());
        }
        final AtomicBoolean success = new AtomicBoolean(false);
        final AtomicInteger numCallbacks = new AtomicInteger(0);
        final CountDownLatch firstReadComplete = new CountDownLatch(1);
        readLh.asyncTryReadLastConfirmed(new AsyncCallback.ReadLastConfirmedCallback(){

            public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
                numCallbacks.incrementAndGet();
                if (0 == rc) {
                    success.set(true);
                } else {
                    success.set(false);
                }
                firstReadComplete.countDown();
            }
        }, null);
        firstReadComplete.await();
        Assert.assertTrue((boolean)success.get());
        Assert.assertTrue((numCallbacks.get() == 1 ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)readLh.getLastAddConfirmed());
        success.set(false);
        numCallbacks.set(0);
        long entryId = readLh.getLastAddConfirmed() + 1L;
        final CountDownLatch secondReadComplete = new CountDownLatch(1);
        readLh.asyncReadLastConfirmedAndEntry(entryId++, 1000L, true, new AsyncCallback.ReadLastConfirmedAndEntryCallback(){

            public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
                numCallbacks.incrementAndGet();
                if (0 == rc && lastConfirmed == 1L) {
                    success.set(true);
                } else {
                    success.set(false);
                }
                secondReadComplete.countDown();
            }
        }, null);
        lh.addEntry("data2".getBytes());
        secondReadComplete.await();
        Assert.assertTrue((boolean)success.get());
        Assert.assertTrue((numCallbacks.get() == 1 ? 1 : 0) != 0);
        Assert.assertEquals((long)1L, (long)readLh.getLastAddConfirmed());
        success.set(false);
        numCallbacks.set(0);
        final CountDownLatch thirdReadComplete = new CountDownLatch(1);
        readLh.asyncReadLastConfirmedAndEntry(entryId++, 1000L, false, new AsyncCallback.ReadLastConfirmedAndEntryCallback(){

            public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
                numCallbacks.incrementAndGet();
                if (0 == rc && lastConfirmed == 2L) {
                    success.set(true);
                } else {
                    success.set(false);
                }
                thirdReadComplete.countDown();
            }
        }, null);
        lh.addEntry("data3".getBytes());
        thirdReadComplete.await();
        Assert.assertTrue((boolean)success.get());
        Assert.assertTrue((numCallbacks.get() == 1 ? 1 : 0) != 0);
        Assert.assertEquals((long)2L, (long)readLh.getLastAddConfirmed());
        lh.close();
        readLh.close();
    }

    @Test
    public void testReadLACLongPollWhenSomeBookiesDown() throws Exception {
        int i;
        int numEntries = 3;
        LedgerHandle lh = this.bkc.createLedger(3, 1, 1, this.digestType, "".getBytes());
        LedgerHandle readLh = this.bkc.openLedgerNoRecovery(lh.getId(), this.digestType, "".getBytes());
        Assert.assertEquals((long)-1L, (long)readLh.getLastAddConfirmed());
        for (i = 0; i < 3; ++i) {
            lh.addEntry(("data" + i).getBytes());
        }
        for (i = 0; i < 3; ++i) {
            ServerConfiguration[] confs = new ServerConfiguration[2];
            for (int j = 0; j < 2; ++j) {
                int idx = (i + 1 + j) % 3;
                confs[j] = this.killBookie((BookieId)LedgerMetadataUtils.getLastEnsembleValue((LedgerMetadata)lh.getLedgerMetadata()).get(idx));
            }
            final AtomicBoolean entryAsExpected = new AtomicBoolean(false);
            final AtomicBoolean success = new AtomicBoolean(false);
            final AtomicInteger numCallbacks = new AtomicInteger(0);
            final CountDownLatch readComplete = new CountDownLatch(1);
            final int entryId = i;
            readLh.asyncTryReadLastConfirmed(new AsyncCallback.ReadLastConfirmedCallback(){

                public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
                    numCallbacks.incrementAndGet();
                    if (0 == rc) {
                        success.set(true);
                        entryAsExpected.set(lastConfirmed == (long)(entryId - 1));
                    } else {
                        System.out.println("Return value" + rc);
                        success.set(false);
                        entryAsExpected.set(false);
                    }
                    readComplete.countDown();
                }
            }, null);
            readComplete.await();
            Assert.assertTrue((boolean)success.get());
            Assert.assertTrue((boolean)entryAsExpected.get());
            Assert.assertTrue((numCallbacks.get() == 1 ? 1 : 0) != 0);
            lh.close();
            readLh.close();
            for (ServerConfiguration conf : confs) {
                this.bs.add(this.startBookie(conf));
                this.bsConfs.add(conf);
            }
        }
    }
}

