/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import java.util.BitSet;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PendingReadOp;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.ReadOpBase;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestSpeculativeRead
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TestSpeculativeRead.class);
    private final BookKeeper.DigestType digestType;
    byte[] passwd = "specPW".getBytes();

    public TestSpeculativeRead() {
        super(10);
        this.digestType = BookKeeper.DigestType.CRC32;
    }

    long getLedgerToRead(int ensemble, int quorum) throws Exception {
        byte[] data = "Data for test".getBytes();
        LedgerHandle l = this.bkc.createLedger(ensemble, quorum, this.digestType, this.passwd);
        for (int i = 0; i < 10; ++i) {
            l.addEntry(data);
        }
        l.close();
        return l.getId();
    }

    BookKeeperTestClient createClient(int specTimeout) throws Exception {
        ClientConfiguration conf = (ClientConfiguration)new ClientConfiguration().setSpeculativeReadTimeout(specTimeout).setReadTimeout(30000).setReorderReadSequenceEnabled(true).setEnsemblePlacementPolicySlowBookies(true).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        return new BookKeeperTestClient(conf, new TestStatsProvider());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSpeculativeRead() throws Exception {
        long id = this.getLedgerToRead(3, 2);
        BookKeeperTestClient bknospec = this.createClient(0);
        BookKeeperTestClient bkspec = this.createClient(2000);
        LedgerHandle lnospec = bknospec.openLedger(id, this.digestType, this.passwd);
        LedgerHandle lspec = bkspec.openLedger(id, this.digestType, this.passwd);
        CountDownLatch sleepLatch = new CountDownLatch(1);
        BookieId second = (BookieId)((List)lnospec.getLedgerMetadata().getAllEnsembles().get(0L)).get(1);
        this.sleepBookie(second, sleepLatch);
        try {
            LatchCallback nospeccb = new LatchCallback();
            LatchCallback speccb = new LatchCallback();
            lnospec.asyncReadEntries(0L, 0L, (AsyncCallback.ReadCallback)nospeccb, null);
            lspec.asyncReadEntries(0L, 0L, (AsyncCallback.ReadCallback)speccb, null);
            nospeccb.expectSuccess(2000);
            speccb.expectSuccess(2000);
            nospeccb = new LatchCallback();
            speccb = new LatchCallback();
            lnospec.asyncReadEntries(1L, 1L, (AsyncCallback.ReadCallback)nospeccb, null);
            lspec.asyncReadEntries(1L, 1L, (AsyncCallback.ReadCallback)speccb, null);
            speccb.expectSuccess(4000);
            nospeccb.expectTimeout(4000);
            RackawareEnsemblePlacementPolicy rep = (RackawareEnsemblePlacementPolicy)bkspec.getPlacementPolicy();
            Assert.assertTrue((rep.slowBookies.asMap().size() == 1 ? 1 : 0) != 0);
            Assert.assertTrue((String)"Stats should not reflect speculative reads if disabled", (bknospec.getTestStatsProvider().getCounter("bookkeeper_client.SPECULATIVE_READ_COUNT").get() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((String)"Stats should reflect speculative reads", (bkspec.getTestStatsProvider().getCounter("bookkeeper_client.SPECULATIVE_READ_COUNT").get() > 0L ? 1 : 0) != 0);
        }
        finally {
            sleepLatch.countDown();
            lspec.close();
            lnospec.close();
            bkspec.close();
            bknospec.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSpeculativeReadMultipleReplicasDown() throws Exception {
        long id = this.getLedgerToRead(5, 5);
        int timeout = 5000;
        BookKeeperTestClient bkspec = this.createClient(timeout);
        LedgerHandle l = bkspec.openLedger(id, this.digestType, this.passwd);
        CountDownLatch sleepLatch = new CountDownLatch(1);
        this.sleepBookie((BookieId)((List)l.getLedgerMetadata().getAllEnsembles().get(0L)).get(1), sleepLatch);
        this.sleepBookie((BookieId)((List)l.getLedgerMetadata().getAllEnsembles().get(0L)).get(2), sleepLatch);
        this.sleepBookie((BookieId)((List)l.getLedgerMetadata().getAllEnsembles().get(0L)).get(4), sleepLatch);
        try {
            LatchCallback latch0 = new LatchCallback();
            l.asyncReadEntries(0L, 0L, (AsyncCallback.ReadCallback)latch0, null);
            latch0.expectSuccess(timeout / 2);
            LatchCallback latch1 = new LatchCallback();
            l.asyncReadEntries(1L, 1L, (AsyncCallback.ReadCallback)latch1, null);
            latch1.expectTimeout(timeout);
            latch1.expectSuccess(timeout * 2);
            LOG.info("Timeout {} latch1 duration {}", (Object)timeout, (Object)latch1.getDuration());
            Assert.assertTrue((String)"should have taken longer than two timeouts, but less than 3", (latch1.getDuration() >= (long)(timeout * 2) && latch1.getDuration() < (long)(timeout * 3) ? 1 : 0) != 0);
            HashSet<BookieId> expectedSlowBookies = new HashSet<BookieId>();
            expectedSlowBookies.add((BookieId)((List)l.getLedgerMetadata().getAllEnsembles().get(0L)).get(1));
            expectedSlowBookies.add((BookieId)((List)l.getLedgerMetadata().getAllEnsembles().get(0L)).get(2));
            Assert.assertEquals(((RackawareEnsemblePlacementPolicy)bkspec.getPlacementPolicy()).slowBookies.asMap().keySet(), expectedSlowBookies);
            LatchCallback latch2 = new LatchCallback();
            l.asyncReadEntries(2L, 2L, (AsyncCallback.ReadCallback)latch2, null);
            latch2.expectSuccess(timeout);
            LatchCallback latch3 = new LatchCallback();
            l.asyncReadEntries(3L, 3L, (AsyncCallback.ReadCallback)latch3, null);
            latch3.expectSuccess(timeout / 2);
            LatchCallback latch4 = new LatchCallback();
            l.asyncReadEntries(4L, 4L, (AsyncCallback.ReadCallback)latch4, null);
            latch4.expectTimeout(timeout / 2);
            latch4.expectSuccess(timeout);
            LOG.info("Timeout {} latch4 duration {}", (Object)timeout, (Object)latch4.getDuration());
            Assert.assertTrue((String)"should have taken longer than one timeout, but less than 2", (latch4.getDuration() >= (long)timeout && latch4.getDuration() < (long)(timeout * 2) ? 1 : 0) != 0);
        }
        finally {
            sleepLatch.countDown();
            l.close();
            bkspec.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception {
        long id = this.getLedgerToRead(2, 2);
        int timeout = 1000;
        BookKeeperTestClient bkspec = this.createClient(timeout);
        LedgerHandle l = bkspec.openLedger(id, this.digestType, this.passwd);
        CountDownLatch sleepLatch0 = new CountDownLatch(1);
        CountDownLatch sleepLatch1 = new CountDownLatch(1);
        this.sleepBookie((BookieId)((List)l.getLedgerMetadata().getAllEnsembles().get(0L)).get(0), sleepLatch0);
        this.sleepBookie((BookieId)((List)l.getLedgerMetadata().getAllEnsembles().get(0L)).get(1), sleepLatch1);
        try {
            LatchCallback latch0 = new LatchCallback();
            l.asyncReadEntries(0L, 0L, (AsyncCallback.ReadCallback)latch0, null);
            latch0.expectTimeout(timeout);
            sleepLatch0.countDown();
            latch0.expectSuccess(timeout / 2);
            sleepLatch1.countDown();
            LatchCallback latch1 = new LatchCallback();
            l.asyncReadEntries(1L, 1L, (AsyncCallback.ReadCallback)latch1, null);
            latch1.expectSuccess(timeout / 2);
        }
        finally {
            sleepLatch0.countDown();
            sleepLatch1.countDown();
            l.close();
            bkspec.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSpeculativeReadScheduledTaskCancel() throws Exception {
        long id = this.getLedgerToRead(3, 2);
        int timeout = 1000;
        BookKeeperTestClient bkspec = this.createClient(timeout);
        LedgerHandle l = bkspec.openLedger(id, this.digestType, this.passwd);
        PendingReadOp op = null;
        try {
            op = new PendingReadOp(l, bkspec.getClientCtx(), 0L, 5L, false);
            op.initiate();
            op.future().get();
        }
        finally {
            Assert.assertNull((String)"Speculative Read tasks must be null", (Object)op.getSpeculativeTask());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSpeculativeReadScheduling() throws Exception {
        long id = this.getLedgerToRead(3, 2);
        int timeout = 1000;
        BookKeeperTestClient bkspec = this.createClient(timeout);
        LedgerHandle l = bkspec.openLedger(id, this.digestType, this.passwd);
        List ensemble = (List)l.getLedgerMetadata().getAllEnsembles().get(0L);
        BitSet allHosts = new BitSet(ensemble.size());
        for (int i = 0; i < ensemble.size(); ++i) {
            allHosts.set(i, true);
        }
        BitSet noHost = new BitSet(ensemble.size());
        BitSet secondHostOnly = new BitSet(ensemble.size());
        secondHostOnly.set(1, true);
        PendingReadOp.SequenceReadRequest req0 = null;
        PendingReadOp.SequenceReadRequest req2 = null;
        PendingReadOp.SequenceReadRequest req4 = null;
        try {
            PendingReadOp op;
            PendingReadOp pendingReadOp = op = new PendingReadOp(l, bkspec.getClientCtx(), 0L, 5L, false);
            Objects.requireNonNull(pendingReadOp);
            req0 = new PendingReadOp.SequenceReadRequest(pendingReadOp, ensemble, l.getId(), 0L);
            Assert.assertTrue((String)"Should have sent to first", (boolean)req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0)));
            Assert.assertNull((String)"Should not have sent another", (Object)req0.maybeSendSpeculativeRead(allHosts));
            PendingReadOp pendingReadOp2 = op;
            Objects.requireNonNull(pendingReadOp2);
            req2 = new PendingReadOp.SequenceReadRequest(pendingReadOp2, ensemble, l.getId(), 2L);
            Assert.assertTrue((String)"Should have sent to third", (boolean)req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2)));
            Assert.assertTrue((String)"Should have sent to first", (boolean)req2.maybeSendSpeculativeRead(secondHostOnly).equals(ensemble.get(0)));
            PendingReadOp pendingReadOp3 = op;
            Objects.requireNonNull(pendingReadOp3);
            req4 = new PendingReadOp.SequenceReadRequest(pendingReadOp3, ensemble, l.getId(), 4L);
            Assert.assertTrue((String)"Should have sent to second", (boolean)req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1)));
            Assert.assertNull((String)"Should not have sent another", (Object)req4.maybeSendSpeculativeRead(secondHostOnly));
        }
        catch (Throwable throwable) {
            for (ReadOpBase.LedgerEntryRequest req : new ReadOpBase.LedgerEntryRequest[]{req0, req2, req4}) {
                if (req == null) continue;
                int i = 0;
                while (!req.isComplete() && i++ <= 10) {
                    Thread.sleep(1000L);
                }
                Assert.assertTrue((String)"Request should be done", (boolean)req.isComplete());
            }
            l.close();
            bkspec.close();
            throw throwable;
        }
        for (ReadOpBase.LedgerEntryRequest req : new ReadOpBase.LedgerEntryRequest[]{req0, req2, req4}) {
            if (req == null) continue;
            int i = 0;
            while (!req.isComplete() && i++ <= 10) {
                Thread.sleep(1000L);
            }
            Assert.assertTrue((String)"Request should be done", (boolean)req.isComplete());
        }
        l.close();
        bkspec.close();
    }

    @Test
    public void testSequenceReadLocalEnsemble() throws Exception {
        PendingReadOp op;
        ClientConfiguration conf = (ClientConfiguration)new ClientConfiguration().setSpeculativeReadTimeout(1000).setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class).setReorderReadSequenceEnabled(true).setEnsemblePlacementPolicySlowBookies(true).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeperTestClient bkspec = new BookKeeperTestClient(conf, new TestStatsProvider());
        LedgerHandle l = bkspec.createLedger(1, 1, this.digestType, this.passwd);
        List ensemble = (List)l.getLedgerMetadata().getAllEnsembles().get(0L);
        PendingReadOp pendingReadOp = op = new PendingReadOp(l, bkspec.getClientCtx(), 0L, 5L, false);
        Objects.requireNonNull(pendingReadOp);
        PendingReadOp.SequenceReadRequest req0 = new PendingReadOp.SequenceReadRequest(pendingReadOp, ensemble, l.getId(), 0L);
        Assert.assertNotNull((Object)req0.writeSet);
    }

    class LatchCallback
    implements AsyncCallback.ReadCallback {
        CountDownLatch l = new CountDownLatch(1);
        boolean success = false;
        long startMillis = System.currentTimeMillis();
        long endMillis = Long.MAX_VALUE;

        LatchCallback() {
        }

        public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
            this.endMillis = System.currentTimeMillis();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got response {} {}", (Object)rc, (Object)this.getDuration());
            }
            this.success = rc == 0;
            this.l.countDown();
        }

        long getDuration() {
            return this.endMillis - this.startMillis;
        }

        void expectSuccess(int milliseconds) throws Exception {
            Assert.assertTrue((boolean)this.l.await(milliseconds, TimeUnit.MILLISECONDS));
            Assert.assertTrue((boolean)this.success);
        }

        void expectFail(int milliseconds) throws Exception {
            Assert.assertTrue((boolean)this.l.await(milliseconds, TimeUnit.MILLISECONDS));
            Assert.assertFalse((boolean)this.success);
        }

        void expectTimeout(int milliseconds) throws Exception {
            Assert.assertFalse((boolean)this.l.await(milliseconds, TimeUnit.MILLISECONDS));
        }
    }
}

