/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl.cache;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.InflightReadsLimiter;
import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class PendingReadsManagerTest {
    private static final Logger log = LoggerFactory.getLogger(PendingReadsManagerTest.class);
    static final Object CTX = "foo";
    static final Object CTX2 = "far";
    static final long ledgerId = 123414L;
    ExecutorService orderedExecutor;
    RangeEntryCacheImpl rangeEntryCache;
    PendingReadsManager pendingReadsManager;
    InflightReadsLimiter inflighReadsLimiter;
    ReadHandle lh;
    ManagedLedgerImpl ml;

    PendingReadsManagerTest() {
    }

    @BeforeClass(alwaysRun=true)
    void before() {
        this.orderedExecutor = Executors.newSingleThreadExecutor();
    }

    @AfterClass(alwaysRun=true)
    void after() {
        if (this.orderedExecutor != null) {
            this.orderedExecutor.shutdown();
            this.orderedExecutor = null;
        }
    }

    @BeforeMethod(alwaysRun=true)
    void setupMocks() {
        this.rangeEntryCache = (RangeEntryCacheImpl)Mockito.mock(RangeEntryCacheImpl.class);
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setReadEntryTimeoutSeconds(10000L);
        Mockito.when((Object)this.rangeEntryCache.getName()).thenReturn((Object)"my-topic");
        Mockito.when((Object)this.rangeEntryCache.getManagedLedgerConfig()).thenReturn((Object)config);
        this.inflighReadsLimiter = new InflightReadsLimiter(0L);
        Mockito.when((Object)this.rangeEntryCache.getPendingReadsLimiter()).thenReturn((Object)this.inflighReadsLimiter);
        this.pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        ((RangeEntryCacheImpl)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                log.info("rangeEntryCache asyncReadEntry0 {}", (Object)invocationOnMock);
                ReadHandle rh = (ReadHandle)invocationOnMock.getArgument(0);
                long startEntry = (Long)invocationOnMock.getArgument(1);
                long endEntry = (Long)invocationOnMock.getArgument(2);
                boolean shouldCacheEntry = (Boolean)invocationOnMock.getArgument(3);
                AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocationOnMock.getArgument(4);
                Object ctx = invocationOnMock.getArgument(5);
                PendingReadsManagerTest.this.pendingReadsManager.readEntries(PendingReadsManagerTest.this.lh, startEntry, endEntry, shouldCacheEntry, callback, ctx);
                return null;
            }
        }).when((Object)this.rangeEntryCache)).asyncReadEntry0((ReadHandle)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        this.lh = (ReadHandle)Mockito.mock(ReadHandle.class);
        this.ml = (ManagedLedgerImpl)Mockito.mock(ManagedLedgerImpl.class);
        Mockito.when((Object)this.ml.getExecutor()).thenReturn((Object)this.orderedExecutor);
        Mockito.when((Object)this.rangeEntryCache.getManagedLedger()).thenReturn((Object)this.ml);
    }

    private static List<EntryImpl> buildList(long start, long end) {
        ArrayList<EntryImpl> result = new ArrayList<EntryImpl>();
        long i = start;
        while (i <= end) {
            long entryId = i++;
            EntryImpl entry = EntryImpl.create((long)123414L, (long)entryId, (byte[])"data".getBytes(StandardCharsets.UTF_8));
            result.add(entry);
        }
        return result;
    }

    private void verifyRange(List<Position> entries, long firstEntry, long endEntry) {
        int pos = 0;
        log.info("verifyRange numEntries {}", (Object)entries.size());
        for (long entry = firstEntry; entry <= endEntry; ++entry) {
            Assert.assertEquals((long)entries.get(pos++).getEntryId(), (long)entry);
        }
    }

    private PreparedReadFromStorage prepareReadFromStorage(ReadHandle lh, RangeEntryCacheImpl rangeEntryCache, long firstEntry, long endEntry, boolean shouldCacheEntry) {
        PreparedReadFromStorage read = new PreparedReadFromStorage(firstEntry, endEntry, shouldCacheEntry);
        log.info("prepareReadFromStorage from {} to {} shouldCacheEntry {}", new Object[]{firstEntry, endEntry, shouldCacheEntry});
        Mockito.when((Object)rangeEntryCache.readFromStorage((ReadHandle)ArgumentMatchers.eq((Object)lh), ArgumentMatchers.eq((long)firstEntry), ArgumentMatchers.eq((long)endEntry), ArgumentMatchers.eq((boolean)shouldCacheEntry))).thenAnswer(invocationOnMock -> {
            log.info("readFromStorage from {} to {} shouldCacheEntry {}", new Object[]{firstEntry, endEntry, shouldCacheEntry});
            return read;
        });
        return read;
    }

    @Test
    public void simpleRead() throws Exception {
        long firstEntry = 100L;
        long endEntry = 199L;
        boolean shouldCacheEntry = false;
        PreparedReadFromStorage read1 = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
        this.pendingReadsManager.readEntries(this.lh, firstEntry, endEntry, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback, CTX);
        read1.storageReadCompleted();
        callback.get();
        AssertJUnit.assertSame((Object)callback.getCtx(), (Object)CTX);
        this.verifyRange(callback.entries, firstEntry, endEntry);
    }

    @Test
    public void simpleConcurrentReadPerfectMatch() throws Exception {
        long firstEntry = 100L;
        long endEntry = 199L;
        boolean shouldCacheEntry = false;
        PreparedReadFromStorage read1 = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntry, endEntry, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback, CTX);
        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntry, endEntry, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback2, CTX2);
        read1.storageReadCompleted();
        callback.get();
        callback2.get();
        AssertJUnit.assertSame((Object)callback.getCtx(), (Object)CTX);
        AssertJUnit.assertSame((Object)callback2.getCtx(), (Object)CTX2);
        this.verifyRange(callback.entries, firstEntry, endEntry);
        this.verifyRange(callback2.entries, firstEntry, endEntry);
        int pos = 0;
        for (long entry = firstEntry; entry <= endEntry; ++entry) {
            AssertJUnit.assertNotSame((Object)callback.entries.get(pos), (Object)callback2.entries.get(pos));
            Assert.assertEquals((long)callback.entries.get(pos).getEntryId(), (long)callback2.entries.get(pos).getEntryId());
            ++pos;
        }
    }

    @Test
    public void simpleConcurrentReadIncluding() throws Exception {
        long firstEntry = 100L;
        long endEntry = 199L;
        long firstEntrySecondRead = firstEntry + 10L;
        long endEntrySecondRead = endEntry - 10L;
        boolean shouldCacheEntry = false;
        PreparedReadFromStorage read1 = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntry, endEntry, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback, CTX);
        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback2, CTX2);
        read1.storageReadCompleted();
        callback.get();
        callback2.get();
        AssertJUnit.assertSame((Object)callback.getCtx(), (Object)CTX);
        AssertJUnit.assertSame((Object)callback2.getCtx(), (Object)CTX2);
        this.verifyRange(callback.entries, firstEntry, endEntry);
        this.verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
        int pos = 0;
        for (long entry = firstEntry; entry <= endEntry; ++entry) {
            if (entry >= firstEntrySecondRead && entry <= endEntrySecondRead) {
                int posInSecondList = (int)((long)pos - (firstEntrySecondRead - firstEntry));
                AssertJUnit.assertNotSame((Object)callback.entries.get(pos), (Object)callback2.entries.get(posInSecondList));
                Assert.assertEquals((long)callback.entries.get(pos).getEntryId(), (long)callback2.entries.get(posInSecondList).getEntryId());
            }
            ++pos;
        }
    }

    @Test
    public void simpleConcurrentReadMissingLeft() throws Exception {
        long firstEntry = 100L;
        long endEntry = 199L;
        long firstEntrySecondRead = firstEntry - 10L;
        long endEntrySecondRead = endEntry;
        boolean shouldCacheEntry = false;
        PreparedReadFromStorage read1 = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
        PreparedReadFromStorage readForLeft = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntrySecondRead, firstEntry - 1L, shouldCacheEntry);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntry, endEntry, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback, CTX);
        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback2, CTX2);
        read1.storageReadCompleted();
        callback.get();
        readForLeft.storageReadCompleted();
        callback2.get();
        AssertJUnit.assertSame((Object)callback.getCtx(), (Object)CTX);
        AssertJUnit.assertSame((Object)callback2.getCtx(), (Object)CTX2);
        this.verifyRange(callback.entries, firstEntry, endEntry);
        this.verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
    }

    @Test
    public void simpleConcurrentReadMissingRight() throws Exception {
        long firstEntry = 100L;
        long endEntry = 199L;
        long firstEntrySecondRead = firstEntry;
        long endEntrySecondRead = endEntry + 10L;
        boolean shouldCacheEntry = false;
        PreparedReadFromStorage read1 = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
        PreparedReadFromStorage readForRight = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, endEntry + 1L, endEntrySecondRead, shouldCacheEntry);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntry, endEntry, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback, CTX);
        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback2, CTX2);
        read1.storageReadCompleted();
        callback.get();
        readForRight.storageReadCompleted();
        callback2.get();
        AssertJUnit.assertSame((Object)callback.getCtx(), (Object)CTX);
        AssertJUnit.assertSame((Object)callback2.getCtx(), (Object)CTX2);
        this.verifyRange(callback.entries, firstEntry, endEntry);
        this.verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
    }

    @Test
    public void simpleConcurrentReadMissingBoth() throws Exception {
        long firstEntry = 100L;
        long endEntry = 199L;
        long firstEntrySecondRead = firstEntry - 10L;
        long endEntrySecondRead = endEntry + 10L;
        boolean shouldCacheEntry = false;
        PreparedReadFromStorage read1 = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
        PreparedReadFromStorage readForLeft = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntrySecondRead, firstEntry - 1L, shouldCacheEntry);
        PreparedReadFromStorage readForRight = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, endEntry + 1L, endEntrySecondRead, shouldCacheEntry);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntry, endEntry, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback, CTX);
        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback2, CTX2);
        read1.storageReadCompleted();
        callback.get();
        readForLeft.storageReadCompleted();
        readForRight.storageReadCompleted();
        callback2.get();
        AssertJUnit.assertSame((Object)callback.getCtx(), (Object)CTX);
        AssertJUnit.assertSame((Object)callback2.getCtx(), (Object)CTX2);
        this.verifyRange(callback.entries, firstEntry, endEntry);
        this.verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
    }

    @Test
    public void simpleConcurrentReadNoMatch() throws Exception {
        long firstEntry = 100L;
        long endEntry = 199L;
        long firstEntrySecondRead = 1000L;
        long endEntrySecondRead = 1099L;
        boolean shouldCacheEntry = false;
        PreparedReadFromStorage read1 = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
        PreparedReadFromStorage read2 = this.prepareReadFromStorage(this.lh, this.rangeEntryCache, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntry, endEntry, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback, CTX);
        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, (AsyncCallbacks.ReadEntriesCallback)callback2, CTX2);
        read1.storageReadCompleted();
        callback.get();
        read2.storageReadCompleted();
        callback2.get();
        AssertJUnit.assertSame((Object)callback.getCtx(), (Object)CTX);
        AssertJUnit.assertSame((Object)callback2.getCtx(), (Object)CTX2);
        this.verifyRange(callback.entries, firstEntry, endEntry);
        this.verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
    }

    private static class PreparedReadFromStorage
    extends CompletableFuture<List<EntryImpl>> {
        final long firstEntry;
        final long endEntry;
        final boolean shouldCacheEntry;

        public PreparedReadFromStorage(long firstEntry, long endEntry, boolean shouldCacheEntry) {
            this.firstEntry = firstEntry;
            this.endEntry = endEntry;
            this.shouldCacheEntry = shouldCacheEntry;
        }

        @Override
        public String toString() {
            return "PreparedReadFromStorage(" + this.firstEntry + "," + this.endEntry + "," + this.shouldCacheEntry + ")";
        }

        public void storageReadCompleted() {
            this.complete(PendingReadsManagerTest.buildList(this.firstEntry, this.endEntry));
        }
    }

    private static class CapturingReadEntriesCallback
    extends CompletableFuture<Void>
    implements AsyncCallbacks.ReadEntriesCallback {
        List<Position> entries;
        Object ctx;
        Throwable error;

        public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
            this.entries = entries.stream().map(Entry::getPosition).collect(Collectors.toList());
            this.ctx = ctx;
            this.error = null;
            this.complete(null);
        }

        public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
            this.entries = null;
            this.ctx = ctx;
            this.error = exception;
            this.completeExceptionally(exception);
        }

        public List<Position> getEntries() {
            return this.entries;
        }

        public Object getCtx() {
            return this.ctx;
        }

        public Throwable getError() {
            return this.error;
        }

        public void setEntries(List<Position> entries) {
            this.entries = entries;
        }

        public void setCtx(Object ctx) {
            this.ctx = ctx;
        }

        public void setError(Throwable error) {
            this.error = error;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof CapturingReadEntriesCallback)) {
                return false;
            }
            CapturingReadEntriesCallback other = (CapturingReadEntriesCallback)o;
            if (!other.canEqual(this)) {
                return false;
            }
            List<Position> this$entries = this.getEntries();
            List<Position> other$entries = other.getEntries();
            if (this$entries == null ? other$entries != null : !((Object)this$entries).equals(other$entries)) {
                return false;
            }
            Object this$ctx = this.getCtx();
            Object other$ctx = other.getCtx();
            if (this$ctx == null ? other$ctx != null : !this$ctx.equals(other$ctx)) {
                return false;
            }
            Throwable this$error = this.getError();
            Throwable other$error = other.getError();
            return !(this$error == null ? other$error != null : !this$error.equals(other$error));
        }

        protected boolean canEqual(Object other) {
            return other instanceof CapturingReadEntriesCallback;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            List<Position> $entries = this.getEntries();
            result = result * 59 + ($entries == null ? 43 : ((Object)$entries).hashCode());
            Object $ctx = this.getCtx();
            result = result * 59 + ($ctx == null ? 43 : $ctx.hashCode());
            Throwable $error = this.getError();
            result = result * 59 + ($error == null ? 43 : $error.hashCode());
            return result;
        }

        @Override
        public String toString() {
            return "PendingReadsManagerTest.CapturingReadEntriesCallback(entries=" + this.getEntries() + ", ctx=" + this.getCtx() + ", error=" + this.getError() + ")";
        }
    }
}

