/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog.impl.logsegment;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.BKDistributedLogManager;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
import org.apache.distributedlog.exceptions.ReadCancelledException;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryReader;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

public class TestBKLogSegmentEntryReader
extends TestDistributedLogBase {
    @Rule
    public TestName runtime = new TestName();
    private OrderedScheduler scheduler;
    private BookKeeperClient bkc;
    private ZooKeeperClient zkc;

    @Override
    @Before
    public void setup() throws Exception {
        super.setup();
        this.zkc = ZooKeeperClientBuilder.newBuilder().name("test-zk").zkServers(bkutil.getZkServers()).sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()).zkAclId(conf.getZkAclId()).build();
        this.bkc = BookKeeperClientBuilder.newBuilder().name("test-bk").dlConfig(conf).ledgersPath("/ledgers").zkServers(bkutil.getZkServers()).build();
        this.scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().name("test-bk-logsegment-entry-reader").numThreads(1).build();
    }

    @Override
    @After
    public void teardown() throws Exception {
        if (null != this.bkc) {
            this.bkc.close();
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
        if (null != this.zkc) {
            this.zkc.close();
        }
        super.teardown();
    }

    BKLogSegmentEntryReader createEntryReader(LogSegmentMetadata segment, long startEntryId, DistributedLogConfiguration conf) throws Exception {
        BKLogSegmentEntryStore store = new BKLogSegmentEntryStore(conf, ConfUtils.getConstDynConf(conf), this.zkc, this.bkc, this.scheduler, null, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL);
        return (BKLogSegmentEntryReader)Utils.ioResult(store.openReader(segment, startEntryId));
    }

    void generateCompletedLogSegments(DistributedLogManager dlm, DistributedLogConfiguration conf, long numCompletedSegments, long segmentSize) throws Exception {
        long txid = 1L;
        for (long i = 0L; i < numCompletedSegments; ++i) {
            AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
            for (long j = 1L; j <= segmentSize; ++j) {
                Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
                LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid);
                ctrlRecord.setControl();
                Utils.ioResult(writer.write(ctrlRecord));
            }
            Utils.close(writer);
        }
    }

    AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm, DistributedLogConfiguration conf, long segmentSize) throws Exception {
        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
        for (long i = 1L; i <= segmentSize; ++i) {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
            LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i);
            ctrlRecord.setControl();
            Utils.ioResult(writer.write(ctrlRecord));
        }
        return writer;
    }

    @Test(timeout=60000L)
    public void testReadEntriesFromCompleteLogSegment() throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setNumPrefetchEntriesPerLogSegment(10);
        confLocal.setMaxPrefetchEntriesPerLogSegment(10);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, this.runtime.getMethodName());
        this.generateCompletedLogSegments(dlm, confLocal, 1L, 20L);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        Assert.assertEquals((String)(segments.size() + " log segments found, expected to be only one"), (long)1L, (long)segments.size());
        BKLogSegmentEntryReader reader = this.createEntryReader(segments.get(0), 0L, confLocal);
        reader.start();
        boolean done = false;
        long txId = 1L;
        long entryId = 0L;
        while (!done) {
            Entry.Reader entryReader;
            try {
                entryReader = Utils.ioResult(reader.readNext(1)).get(0);
            }
            catch (EndOfLogSegmentException eol) {
                done = true;
                continue;
            }
            LogRecordWithDLSN record = entryReader.nextRecord();
            while (null != record) {
                if (!record.isControl()) {
                    DLMTestUtil.verifyLogRecord(record);
                    Assert.assertEquals((long)txId, (long)record.getTransactionId());
                    ++txId;
                }
                DLSN dlsn = record.getDlsn();
                Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
                Assert.assertEquals((long)entryId, (long)dlsn.getEntryId());
                record = entryReader.nextRecord();
            }
            ++entryId;
        }
        Assert.assertEquals((long)21L, (long)txId);
        Assert.assertFalse((boolean)reader.hasCaughtUpOnInprogress());
        Utils.close(reader);
    }

    @Test(timeout=60000L)
    public void testCloseReaderToCancelPendingReads() throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setNumPrefetchEntriesPerLogSegment(10);
        confLocal.setMaxPrefetchEntriesPerLogSegment(10);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, this.runtime.getMethodName());
        DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 1L, 20L);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        Assert.assertEquals((String)(segments.size() + " log segments found, expected to be only one"), (long)1L, (long)segments.size());
        BKLogSegmentEntryReader reader = this.createEntryReader(segments.get(0), 0L, confLocal);
        ArrayList<CompletableFuture<List<Entry.Reader>>> futures = Lists.newArrayList();
        for (int i = 0; i < 5; ++i) {
            futures.add(reader.readNext(1));
        }
        Assert.assertFalse((String)"Reader should not be closed yet", (boolean)reader.isClosed());
        Utils.close(reader);
        for (CompletableFuture completableFuture : futures) {
            try {
                Utils.ioResult(completableFuture);
                Assert.fail((String)"The read request should be cancelled");
            }
            catch (ReadCancelledException readCancelledException) {}
        }
        Assert.assertFalse((boolean)reader.hasCaughtUpOnInprogress());
        Assert.assertTrue((String)"Reader should be closed yet", (boolean)reader.isClosed());
    }

    @Test(timeout=60000L)
    public void testMaxPrefetchEntriesSmallBatch() throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setNumPrefetchEntriesPerLogSegment(2);
        confLocal.setMaxPrefetchEntriesPerLogSegment(10);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, this.runtime.getMethodName());
        this.generateCompletedLogSegments(dlm, confLocal, 1L, 20L);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        Assert.assertEquals((String)(segments.size() + " log segments found, expected to be only one"), (long)1L, (long)segments.size());
        BKLogSegmentEntryReader reader = this.createEntryReader(segments.get(0), 0L, confLocal);
        reader.start();
        while (reader.readAheadEntries.size() < 10) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        long txId = 1L;
        long entryId = 0L;
        Assert.assertEquals((long)10L, (long)reader.readAheadEntries.size());
        Assert.assertEquals((long)10L, (long)reader.getNextEntryId());
        Assert.assertFalse((boolean)reader.hasCaughtUpOnInprogress());
        Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
        LogRecordWithDLSN record = entryReader.nextRecord();
        while (null != record) {
            if (!record.isControl()) {
                DLMTestUtil.verifyLogRecord(record);
                Assert.assertEquals((long)txId, (long)record.getTransactionId());
                ++txId;
            }
            DLSN dlsn = record.getDlsn();
            Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals((long)entryId, (long)dlsn.getEntryId());
            record = entryReader.nextRecord();
        }
        ++entryId;
        Assert.assertEquals((long)2L, (long)txId);
        while (reader.readAheadEntries.size() < 10) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals((long)10L, (long)reader.readAheadEntries.size());
        Assert.assertEquals((long)11L, (long)reader.getNextEntryId());
        Assert.assertFalse((boolean)reader.hasCaughtUpOnInprogress());
        Utils.close(reader);
    }

    @Test(timeout=60000L)
    public void testMaxPrefetchEntriesLargeBatch() throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setNumPrefetchEntriesPerLogSegment(10);
        confLocal.setMaxPrefetchEntriesPerLogSegment(5);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, this.runtime.getMethodName());
        this.generateCompletedLogSegments(dlm, confLocal, 1L, 20L);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        Assert.assertEquals((String)(segments.size() + " log segments found, expected to be only one"), (long)1L, (long)segments.size());
        BKLogSegmentEntryReader reader = this.createEntryReader(segments.get(0), 0L, confLocal);
        reader.start();
        while (reader.readAheadEntries.size() < 5) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        long txId = 1L;
        long entryId = 0L;
        Assert.assertEquals((long)5L, (long)reader.readAheadEntries.size());
        Assert.assertEquals((long)5L, (long)reader.getNextEntryId());
        Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
        LogRecordWithDLSN record = entryReader.nextRecord();
        while (null != record) {
            if (!record.isControl()) {
                DLMTestUtil.verifyLogRecord(record);
                Assert.assertEquals((long)txId, (long)record.getTransactionId());
                ++txId;
            }
            DLSN dlsn = record.getDlsn();
            Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals((long)entryId, (long)dlsn.getEntryId());
            record = entryReader.nextRecord();
        }
        ++entryId;
        Assert.assertEquals((long)2L, (long)txId);
        while (reader.readAheadEntries.size() < 5) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals((long)5L, (long)reader.readAheadEntries.size());
        Assert.assertEquals((long)6L, (long)reader.getNextEntryId());
        Assert.assertFalse((boolean)reader.hasCaughtUpOnInprogress());
        Utils.close(reader);
    }

    @Test(timeout=60000L)
    public void testMaxPrefetchEntriesSmallSegment() throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setNumPrefetchEntriesPerLogSegment(10);
        confLocal.setMaxPrefetchEntriesPerLogSegment(20);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, this.runtime.getMethodName());
        this.generateCompletedLogSegments(dlm, confLocal, 1L, 5L);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        Assert.assertEquals((String)(segments.size() + " log segments found, expected to be only one"), (long)1L, (long)segments.size());
        BKLogSegmentEntryReader reader = this.createEntryReader(segments.get(0), 0L, confLocal);
        reader.start();
        while ((long)reader.readAheadEntries.size() < reader.getLastAddConfirmed() + 1L) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        long txId = 1L;
        long entryId = 0L;
        Assert.assertEquals((long)(reader.getLastAddConfirmed() + 1L), (long)reader.readAheadEntries.size());
        Assert.assertEquals((long)(reader.getLastAddConfirmed() + 1L), (long)reader.getNextEntryId());
        Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
        LogRecordWithDLSN record = entryReader.nextRecord();
        while (null != record) {
            if (!record.isControl()) {
                DLMTestUtil.verifyLogRecord(record);
                Assert.assertEquals((long)txId, (long)record.getTransactionId());
                ++txId;
            }
            DLSN dlsn = record.getDlsn();
            Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals((long)entryId, (long)dlsn.getEntryId());
            record = entryReader.nextRecord();
        }
        ++entryId;
        Assert.assertEquals((long)2L, (long)txId);
        Assert.assertEquals((long)reader.getLastAddConfirmed(), (long)reader.readAheadEntries.size());
        Assert.assertEquals((long)(reader.getLastAddConfirmed() + 1L), (long)reader.getNextEntryId());
        Assert.assertFalse((boolean)reader.hasCaughtUpOnInprogress());
        Utils.close(reader);
    }

    @Test(timeout=60000L)
    public void testReadEntriesFromInprogressSegment() throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setNumPrefetchEntriesPerLogSegment(20);
        confLocal.setMaxPrefetchEntriesPerLogSegment(20);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, this.runtime.getMethodName());
        AsyncLogWriter writer = this.createInprogressLogSegment(dlm, confLocal, 5L);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        Assert.assertEquals((String)(segments.size() + " log segments found, expected to be only one"), (long)1L, (long)segments.size());
        BKLogSegmentEntryReader reader = this.createEntryReader(segments.get(0), 0L, confLocal);
        reader.start();
        long expectedLastAddConfirmed = 8L;
        while ((long)reader.readAheadEntries.size() < expectedLastAddConfirmed + 2L) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals((long)(expectedLastAddConfirmed + 2L), (long)reader.getNextEntryId());
        long txId = 1L;
        long entryId = 0L;
        do {
            Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
            LogRecordWithDLSN record = entryReader.nextRecord();
            while (null != record) {
                if (!record.isControl()) {
                    DLMTestUtil.verifyLogRecord(record);
                    Assert.assertEquals((long)txId, (long)record.getTransactionId());
                    ++txId;
                }
                DLSN dlsn = record.getDlsn();
                Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
                Assert.assertEquals((long)entryId, (long)dlsn.getEntryId());
                record = entryReader.nextRecord();
            }
        } while (++entryId != expectedLastAddConfirmed + 1L);
        Assert.assertEquals((long)6L, (long)txId);
        CompletableFuture<List<Entry.Reader>> nextReadFuture = reader.readNext(1);
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
        List<Entry.Reader> nextReadEntries = Utils.ioResult(nextReadFuture);
        Assert.assertEquals((long)1L, (long)nextReadEntries.size());
        Assert.assertTrue((boolean)reader.hasCaughtUpOnInprogress());
        Entry.Reader entryReader = nextReadEntries.get(0);
        LogRecordWithDLSN record = entryReader.nextRecord();
        Assert.assertNotNull((Object)record);
        Assert.assertTrue((boolean)record.isControl());
        Assert.assertNull((Object)entryReader.nextRecord());
        while (reader.getNextEntryId() <= entryId) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals((long)(entryId + 2L), (long)reader.getNextEntryId());
        Assert.assertEquals((long)1L, (long)reader.readAheadEntries.size());
        Utils.close(reader);
        Utils.close(writer);
    }

    @Test(timeout=60000L)
    public void testReadEntriesOnStateChange() throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setNumPrefetchEntriesPerLogSegment(20);
        confLocal.setMaxPrefetchEntriesPerLogSegment(20);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, this.runtime.getMethodName());
        AsyncLogWriter writer = this.createInprogressLogSegment(dlm, confLocal, 5L);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        Assert.assertEquals((String)(segments.size() + " log segments found, expected to be only one"), (long)1L, (long)segments.size());
        BKLogSegmentEntryReader reader = this.createEntryReader(segments.get(0), 0L, confLocal);
        reader.start();
        long expectedLastAddConfirmed = 8L;
        while ((long)reader.readAheadEntries.size() < expectedLastAddConfirmed + 2L) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals((long)(expectedLastAddConfirmed + 2L), (long)reader.getNextEntryId());
        long txId = 1L;
        long entryId = 0L;
        do {
            Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
            LogRecordWithDLSN record = entryReader.nextRecord();
            while (null != record) {
                if (!record.isControl()) {
                    DLMTestUtil.verifyLogRecord(record);
                    Assert.assertEquals((long)txId, (long)record.getTransactionId());
                    ++txId;
                }
                DLSN dlsn = record.getDlsn();
                Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
                Assert.assertEquals((long)entryId, (long)dlsn.getEntryId());
                record = entryReader.nextRecord();
            }
        } while (++entryId != expectedLastAddConfirmed + 1L);
        Assert.assertEquals((long)6L, (long)txId);
        CompletableFuture<List<Entry.Reader>> nextReadFuture = reader.readNext(1);
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
        List<Entry.Reader> nextReadEntries = Utils.ioResult(nextReadFuture);
        Assert.assertEquals((long)1L, (long)nextReadEntries.size());
        Entry.Reader entryReader = nextReadEntries.get(0);
        LogRecordWithDLSN record = entryReader.nextRecord();
        Assert.assertNotNull((Object)record);
        Assert.assertTrue((boolean)record.isControl());
        Assert.assertNull((Object)entryReader.nextRecord());
        while (reader.getNextEntryId() <= entryId) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals((long)(entryId + 2L), (long)reader.getNextEntryId());
        Assert.assertEquals((long)1L, (long)reader.readAheadEntries.size());
        ++entryId;
        Utils.close(writer);
        entryReader = Utils.ioResult(reader.readNext(1)).get(0);
        record = entryReader.nextRecord();
        Assert.assertNotNull((Object)record);
        Assert.assertFalse((boolean)record.isControl());
        Assert.assertNull((Object)entryReader.nextRecord());
        while (reader.getNextEntryId() <= entryId + 1L) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals((long)(entryId + 2L), (long)reader.getNextEntryId());
        Assert.assertEquals((long)1L, (long)reader.readAheadEntries.size());
        List<LogSegmentMetadata> newSegments = dlm.getLogSegments();
        Assert.assertEquals((long)1L, (long)newSegments.size());
        Assert.assertFalse((boolean)newSegments.get(0).isInProgress());
        reader.onLogSegmentMetadataUpdated(newSegments.get(0));
        try {
            Utils.ioResult(reader.readNext(1));
            Utils.ioResult(reader.readNext(1));
            Assert.fail((String)"Should reach end of log segment");
        }
        catch (EndOfLogSegmentException endOfLogSegmentException) {
            // empty catch block
        }
        Utils.close(reader);
    }
}

