/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.proto;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.SlowBufferedChannel;
import org.apache.bookkeeper.bookie.SlowInterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.SlowSortedLedgerStorage;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieRequestProcessor;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookieBackpressureTest
extends BookKeeperClusterTestCase
implements AsyncCallback.AddCallback,
AsyncCallback.ReadCallback,
AsyncCallback.ReadLastConfirmedCallback {
    private static final Logger LOG = LoggerFactory.getLogger(BookieBackpressureTest.class);
    byte[] ledgerPassword = "aaa".getBytes();
    final byte[] data = new byte[8192];
    static final int NUM_ENTRIES_TO_WRITE = 200;
    static final int ENTRIES_IN_MEMTABLE = 2;
    static final int MAX_PENDING = 5;
    static final int NUM_OF_LEDGERS = 10;
    BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
    long getDelay;
    long addDelay;
    long flushDelay;

    public BookieBackpressureTest() {
        super(1);
        this.baseClientConf.setAddEntryTimeout(100);
        this.baseClientConf.setAddEntryQuorumTimeout(100);
        this.baseClientConf.setReadEntryTimeout(100);
    }

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.getDelay = 0L;
        this.addDelay = 0L;
        this.flushDelay = 0L;
    }

    private Bookie bookieWithMockedJournal(ServerConfiguration conf, long getDelay, long addDelay, long flushDelay) throws Exception {
        TestBookieImpl bookie = new TestBookieImpl(conf);
        if (getDelay <= 0L && addDelay <= 0L && flushDelay <= 0L) {
            return bookie;
        }
        List<Journal> journals = this.getJournals((Bookie)bookie);
        for (int i = 0; i < journals.size(); ++i) {
            Journal mock = (Journal)Mockito.spy((Object)journals.get(i));
            Mockito.when((Object)mock.getBufferedChannelBuilder()).thenReturn((fc, capacity) -> {
                SlowBufferedChannel sbc = new SlowBufferedChannel((ByteBufAllocator)UnpooledByteBufAllocator.DEFAULT, fc, capacity);
                sbc.setAddDelay(addDelay);
                sbc.setGetDelay(getDelay);
                sbc.setFlushDelay(flushDelay);
                return sbc;
            });
            journals.set(i, mock);
        }
        return bookie;
    }

    private List<Journal> getJournals(Bookie bookie) throws NoSuchFieldException, IllegalAccessException {
        Field f = BookieImpl.class.getDeclaredField("journals");
        f.setAccessible(true);
        return (List)f.get(bookie);
    }

    @Test
    public void testWriteNoBackpressureSlowJournal() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(0);
        this.addDelay = 1L;
        this.doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteNoBackpressureSlowJournalFlush() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(0);
        this.confByIndex(0).setJournalAdaptiveGroupWrites(false);
        this.flushDelay = 1L;
        this.doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSlowJournal() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(5);
        this.flushDelay = 1L;
        this.doWritesWithBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSlowJournalFlush() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(5);
        this.confByIndex(0).setJournalAdaptiveGroupWrites(false);
        this.flushDelay = 1L;
        this.doWritesWithBackpressure(0);
    }

    @Test
    public void testWriteNoBackpressureSlowInterleavedStorage() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(0);
        this.confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.confByIndex(0).setWriteBufferBytes(this.data.length);
        this.confByIndex(0).setProperty("test.slowStorage.addDelay", (Object)"1");
        this.doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSlowInterleavedStorage() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(5);
        this.confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.confByIndex(0).setWriteBufferBytes(this.data.length);
        this.confByIndex(0).setProperty("test.slowStorage.addDelay", (Object)"1");
        this.doWritesWithBackpressure(0);
    }

    @Test
    public void testWriteNoBackpressureSlowInterleavedStorageFlush() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(0);
        this.confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.confByIndex(0).setWriteBufferBytes(this.data.length);
        this.confByIndex(0).setProperty("test.slowStorage.flushDelay", (Object)"10");
        this.doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSlowInterleavedStorageFlush() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(5);
        this.confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.confByIndex(0).setWriteBufferBytes(this.data.length);
        this.confByIndex(0).setProperty("test.slowStorage.flushDelay", (Object)"10");
        this.doWritesWithBackpressure(0);
    }

    @Test
    public void testWriteNoBackpressureSortedStorage() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(0);
        this.confByIndex(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
        this.confByIndex(0).setWriteBufferBytes(this.data.length);
        Assert.assertTrue((String)"for the test, memtable should not keep more entries than allowed", (boolean)true);
        this.confByIndex(0).setSkipListSizeLimit(this.data.length * 2 - 1);
        this.confByIndex(0).setProperty("test.slowStorage.addDelay", (Object)"1");
        this.confByIndex(0).setProperty("test.slowStorage.flushDelay", (Object)"10");
        this.doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSortedStorage() throws Exception {
        this.confByIndex(0).setMaxAddsInProgressLimit(5);
        this.confByIndex(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
        this.confByIndex(0).setWriteBufferBytes(this.data.length);
        Assert.assertTrue((String)"for the test, memtable should not keep more entries than allowed", (boolean)true);
        this.confByIndex(0).setSkipListSizeLimit(this.data.length * 2 - 1);
        this.confByIndex(0).setProperty("test.slowStorage.addDelay", (Object)"1");
        this.confByIndex(0).setProperty("test.slowStorage.flushDelay", (Object)"10");
        this.doWritesWithBackpressure(0);
    }

    @Test
    public void testReadsNoBackpressure() throws Exception {
        this.confByIndex(0).setMaxReadsInProgressLimit(0);
        this.confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.confByIndex(0).setWriteBufferBytes(this.data.length);
        this.confByIndex(0).setProperty("test.slowStorage.getDelay", (Object)"1");
        BookieRequestProcessor brp = this.generateDataAndDoReads(0);
        Assert.assertThat((String)"reads in progress should exceed MAX_PENDING", (Object)brp.maxReadsInProgressCount(), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(5)));
    }

    @Test
    public void testReadsWithBackpressure() throws Exception {
        this.confByIndex(0).setMaxReadsInProgressLimit(5);
        this.confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.confByIndex(0).setWriteBufferBytes(this.data.length);
        this.confByIndex(0).setProperty("test.slowStorage.getDelay", (Object)"1");
        BookieRequestProcessor brp = this.generateDataAndDoReads(0);
        Assert.assertThat((String)"reads in progress should NOT exceed MAX_PENDING ", (Object)brp.maxReadsInProgressCount(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(5)));
    }

    private BookieRequestProcessor generateDataAndDoReads(int bkId) throws Exception {
        Assert.assertThat((String)"should be only one bookie", (Object)this.bookieCount(), (Matcher)Matchers.equalTo((Object)1));
        ServerConfiguration conf = this.killBookie(0);
        BookieServer bks = this.startAndAddBookie(conf, this.bookieWithMockedJournal(conf, this.getDelay, this.addDelay, this.flushDelay)).getServer();
        LOG.info("creating ledgers");
        int numEntriesForReads = 10;
        LedgerHandle[] lhs = new LedgerHandle[10];
        for (int i = 0; i < 10; ++i) {
            lhs[i] = this.bkc.createLedger(1, 1, this.digestType, this.ledgerPassword);
            LOG.info("created ledger ID: {}", (Object)lhs[i].getId());
        }
        LOG.info("generating data for reads");
        CountDownLatch writesCompleteLatch = new CountDownLatch(100);
        for (int i = 0; i < 10; ++i) {
            for (int ledger = 0; ledger < 10; ++ledger) {
                lhs[ledger].asyncAddEntry(this.data, (rc2, lh, entryId, ctx) -> writesCompleteLatch.countDown(), null);
            }
        }
        writesCompleteLatch.await();
        LOG.info("issue bunch of async reads");
        CountDownLatch readsCompleteLatch = new CountDownLatch(100);
        for (int i = 0; i < 10; ++i) {
            for (int ledger = 0; ledger < 10; ++ledger) {
                lhs[ledger].asyncReadEntries((long)i, (long)i, (rc, lh, seq, ctx) -> readsCompleteLatch.countDown(), null);
            }
        }
        readsCompleteLatch.await();
        LOG.info("reads finished");
        return bks.getBookieRequestProcessor();
    }

    private void doWritesNoBackpressure(int bkId) throws Exception {
        Assert.assertThat((String)"should be only one bookie", (Object)this.bookieCount(), (Matcher)Matchers.equalTo((Object)1));
        ServerConfiguration conf = this.killBookie(0);
        BookieServer bks = this.startAndAddBookie(conf, this.bookieWithMockedJournal(conf, this.getDelay, this.addDelay, this.flushDelay)).getServer();
        LOG.info("Creating ledgers");
        LedgerHandle[] lhs = new LedgerHandle[10];
        for (int i = 0; i < 10; ++i) {
            lhs[i] = this.bkc.createLedger(1, 1, this.digestType, this.ledgerPassword);
            LOG.info("created ledger ID: {}", (Object)lhs[i].getId());
        }
        CountDownLatch completeLatch = new CountDownLatch(2000);
        LOG.info("submitting writes");
        for (int i = 0; i < 200; ++i) {
            for (int ledger = 0; ledger < 10; ++ledger) {
                lhs[ledger].asyncAddEntry(this.data, (rc2, lh, entryId, ctx) -> completeLatch.countDown(), null);
            }
        }
        boolean exceededLimit = false;
        BookieRequestProcessor brp = bks.getBookieRequestProcessor();
        while (!completeLatch.await(1L, TimeUnit.MILLISECONDS)) {
            int val = brp.maxAddsInProgressCount();
            if (val > 5) {
                exceededLimit = true;
                break;
            }
            LOG.info("Waiting until all writes succeeded or maxAddsInProgressCount {} > MAX_PENDING {}", (Object)val, (Object)5);
        }
        Assert.assertTrue((String)"expected to exceed number of pending writes", (boolean)exceededLimit);
        for (int i = 0; i < 10; ++i) {
            lhs[i].close();
        }
    }

    private void doWritesWithBackpressure(int bkId) throws Exception {
        Assert.assertThat((String)"should be only one bookie", (Object)this.bookieCount(), (Matcher)Matchers.equalTo((Object)1));
        ServerConfiguration conf = this.killBookie(0);
        BookieServer bks = this.startAndAddBookie(conf, this.bookieWithMockedJournal(conf, this.getDelay, this.addDelay, this.flushDelay)).getServer();
        LOG.info("Creating ledgers");
        LedgerHandle[] lhs = new LedgerHandle[10];
        for (int i = 0; i < 10; ++i) {
            lhs[i] = this.bkc.createLedger(1, 1, this.digestType, this.ledgerPassword);
            LOG.info("created ledger ID: {}", (Object)lhs[i].getId());
        }
        CountDownLatch completeLatch = new CountDownLatch(2000);
        AtomicInteger rc = new AtomicInteger(0);
        LOG.info("submitting writes");
        for (int i = 0; i < 200; ++i) {
            for (int ledger = 0; ledger < 10; ++ledger) {
                lhs[ledger].asyncAddEntry(this.data, (rc2, lh, entryId, ctx) -> {
                    rc.compareAndSet(0, rc2);
                    completeLatch.countDown();
                }, null);
            }
        }
        LOG.info("test submitted all writes");
        BookieRequestProcessor brp = bks.getBookieRequestProcessor();
        while (!completeLatch.await(1L, TimeUnit.MILLISECONDS)) {
            int val = brp.maxAddsInProgressCount();
            Assert.assertTrue((String)("writes in progress should not exceed limit, got " + val), (val <= 5 ? 1 : 0) != 0);
            LOG.info("Waiting for all writes to succeed, left {} of {}", (Object)completeLatch.getCount(), (Object)2000);
        }
        if (rc.get() != 0) {
            throw BKException.create((int)rc.get());
        }
        for (int i = 0; i < 10; ++i) {
            lhs[i].close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
        SyncObj sync = (SyncObj)ctx;
        sync.setReturnCode(rc);
        SyncObj syncObj = sync;
        synchronized (syncObj) {
            ++sync.counter;
            sync.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
        SyncObj sync = (SyncObj)ctx;
        sync.setLedgerEntries(seq);
        sync.setReturnCode(rc);
        SyncObj syncObj = sync;
        synchronized (syncObj) {
            sync.value = true;
            sync.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
        SyncObj sync = (SyncObj)ctx;
        sync.setReturnCode(rc);
        SyncObj syncObj = sync;
        synchronized (syncObj) {
            sync.lastConfirmed = lastConfirmed;
            sync.notify();
        }
    }

    class SyncObj {
        long lastConfirmed = -1L;
        volatile int counter = 0;
        boolean value = false;
        AtomicInteger rc = new AtomicInteger(0);
        Enumeration<LedgerEntry> ls = null;

        void setReturnCode(int rc) {
            this.rc.compareAndSet(0, rc);
        }

        void setLedgerEntries(Enumeration<LedgerEntry> ls) {
            this.ls = ls;
        }
    }
}

