/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.Predicate;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.ws.rs.client.Entity;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PersistentMessageFinderTest
extends MockedBookKeeperTestCase {
    public static byte[] createMessageWrittenToLedger(String msg) {
        MessageMetadata messageMetadata = new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("createMessageWrittenToLedger").setSequenceId(1L);
        ByteBuf data = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes());
        int msgMetadataSize = messageMetadata.getSerializedSize();
        int payloadSize = data.readableBytes();
        int totalSize = 4 + msgMetadataSize + payloadSize;
        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
        headers.writeInt(msgMetadataSize);
        messageMetadata.writeTo(headers);
        ByteBuf headersAndPayload = ByteBufPair.coalesce((ByteBufPair)ByteBufPair.get((ByteBuf)headers, (ByteBuf)data));
        byte[] byteMessage = headersAndPayload.nioBuffer().array();
        headersAndPayload.release();
        return byteMessage;
    }

    public static ByteBuf createMessageByteBufWrittenToLedger(String msg) throws Exception {
        MessageMetadata messageMetadata = new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("createMessageWrittenToLedger").setSequenceId(1L);
        ByteBuf data = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes());
        int msgMetadataSize = messageMetadata.getSerializedSize();
        int payloadSize = data.readableBytes();
        int totalSize = 4 + msgMetadataSize + payloadSize;
        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
        headers.writeInt(msgMetadataSize);
        messageMetadata.writeTo(headers);
        return ByteBufPair.coalesce((ByteBufPair)ByteBufPair.get((ByteBuf)headers, (ByteBuf)data));
    }

    public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads) throws Exception {
        ByteBuf msgWithEntryMeta = Commands.addBrokerEntryMetadata((ByteBuf)headerAndPayloads, PersistentMessageFinderTest.getBrokerEntryMetadataInterceptors(), (int)1);
        byte[] byteMessage = msgWithEntryMeta.nioBuffer().array();
        msgWithEntryMeta.release();
        return byteMessage;
    }

    CompletableFuture<Void> findMessage(final Result result, ManagedCursor c1, long timestamp) {
        PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1);
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        messageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback(){

            public void findEntryComplete(Position position, Object ctx) {
                result.position = position;
                future.complete(null);
            }

            public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
                result.exception = exception;
                future.completeExceptionally((Throwable)exception);
            }
        });
        return future;
    }

    @Test
    void testPersistentMessageFinder() throws Exception {
        String ledgerAndCursorName = "testPersistentMessageFinder";
        int entriesPerLedger = 2;
        long beginTimestamp = System.currentTimeMillis();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(10L);
        config.setMaxEntriesPerLedger(entriesPerLedger);
        config.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedger ledger = this.factory.open("testPersistentMessageFinder", config);
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("testPersistentMessageFinder");
        ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("retained1"));
        Thread.sleep(100L);
        ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("retained2"));
        Thread.sleep(100L);
        Position newPosition = ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("retained3"));
        Thread.sleep(100L);
        long timestamp = System.currentTimeMillis();
        Thread.sleep(10L);
        ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("afterresetposition"));
        Position lastPosition = ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("not-read"));
        List entries = c1.readEntries(3);
        c1.markDelete(((Entry)entries.get(2)).getPosition());
        c1.close();
        ledger.close();
        entries.forEach(e -> e.release());
        Thread.sleep(1000L);
        ledger = this.factory.open("testPersistentMessageFinder", config);
        c1 = (ManagedCursorImpl)ledger.openCursor("testPersistentMessageFinder");
        long endTimestamp = System.currentTimeMillis();
        Result result = new Result();
        CompletableFuture<Void> future = this.findMessage(result, (ManagedCursor)c1, timestamp);
        future.get();
        Assert.assertNull((Object)result.exception);
        Assert.assertNotNull((Object)result.position);
        Assert.assertEquals((Object)result.position, (Object)newPosition);
        result.reset();
        future = this.findMessage(result, (ManagedCursor)c1, beginTimestamp);
        future.get();
        Assert.assertNull((Object)result.exception);
        Assert.assertNull((Object)result.position);
        result.reset();
        future = this.findMessage(result, (ManagedCursor)c1, endTimestamp);
        future.get();
        Assert.assertNull((Object)result.exception);
        Assert.assertNotEquals((Object)result.position, null);
        Assert.assertEquals((Object)result.position, (Object)lastPosition);
        PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", (ManagedCursor)c1);
        final AtomicBoolean ex = new AtomicBoolean(false);
        messageFinder.findEntryFailed(new ManagedLedgerException("failed"), Optional.empty(), (Object)new AsyncCallbacks.FindEntryCallback(){

            public void findEntryComplete(Position position, Object ctx) {
            }

            public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
                ex.set(true);
            }
        });
        Assert.assertTrue((boolean)ex.get());
        PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), (ManagedCursor)c1, null);
        monitor.findEntryFailed((ManagedLedgerException)new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), Optional.empty(), null);
        Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
        field.setAccessible(true);
        Assert.assertEquals((Object)0, (Object)field.get(monitor));
        result.reset();
        c1.close();
        ledger.close();
        this.factory.shutdown();
    }

    @Test
    void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception {
        String ledgerAndCursorName = "publishTime";
        String ledgerAndCursorNameForBrokerTimestampMessage = "brokerTimestamp";
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        ManagedLedger ledger = this.factory.open("publishTime", config);
        ManagedCursorImpl cursor = (ManagedCursorImpl)ledger.openCursor("publishTime");
        ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("message1"));
        Thread.sleep(100L);
        ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("message2"));
        Thread.sleep(100L);
        Position position = ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("message3"));
        Thread.sleep(100L);
        long timestamp = System.currentTimeMillis();
        Result result = new Result();
        CompletableFuture<Void> future = this.findMessage(result, (ManagedCursor)cursor, timestamp);
        future.get();
        Assert.assertNull((Object)result.exception);
        Assert.assertNotNull((Object)result.position);
        Assert.assertEquals((Object)result.position, (Object)position);
        List entryList = cursor.readEntries(3);
        for (Entry entry : entryList) {
            Assert.assertNull((Object)Commands.parseBrokerEntryMetadataIfExist((ByteBuf)entry.getDataBuffer()));
        }
        result.reset();
        cursor.close();
        ledger.close();
        ManagedLedgerConfig configNew = new ManagedLedgerConfig();
        ManagedLedger ledgerNew = this.factory.open("brokerTimestamp", configNew);
        ManagedCursorImpl cursorNew = (ManagedCursorImpl)ledgerNew.openCursor("brokerTimestamp");
        ByteBuf msg1 = PersistentMessageFinderTest.createMessageByteBufWrittenToLedger("message1");
        ByteBuf msg2 = PersistentMessageFinderTest.createMessageByteBufWrittenToLedger("message2");
        ByteBuf msg3 = PersistentMessageFinderTest.createMessageByteBufWrittenToLedger("message3");
        Thread.sleep(10L);
        long timeAfterPublishTime = System.currentTimeMillis();
        Thread.sleep(10L);
        ledgerNew.addEntry(PersistentMessageFinderTest.appendBrokerTimestamp(msg1));
        Thread.sleep(100L);
        ledgerNew.addEntry(PersistentMessageFinderTest.appendBrokerTimestamp(msg2));
        Thread.sleep(100L);
        Position newPosition = ledgerNew.addEntry(PersistentMessageFinderTest.appendBrokerTimestamp(msg3));
        Thread.sleep(100L);
        long timeAfterBrokerTimestamp = System.currentTimeMillis();
        CompletableFuture<Void> publishTimeFuture = this.findMessage(result, (ManagedCursor)cursorNew, timeAfterPublishTime);
        publishTimeFuture.get();
        Assert.assertNull((Object)result.exception);
        Assert.assertNull((Object)result.position);
        result.reset();
        CompletableFuture<Void> brokerTimestampFuture = this.findMessage(result, (ManagedCursor)cursorNew, timeAfterBrokerTimestamp);
        brokerTimestampFuture.get();
        Assert.assertNull((Object)result.exception);
        Assert.assertNotNull((Object)result.position);
        Assert.assertEquals((Object)result.position, (Object)newPosition);
        List entryListNew = cursorNew.readEntries(4);
        for (Entry entry : entryListNew) {
            BrokerEntryMetadata brokerMetadata = Commands.parseBrokerEntryMetadataIfExist((ByteBuf)entry.getDataBuffer());
            Assert.assertNotNull((Object)brokerMetadata);
            Assert.assertTrue((brokerMetadata.getBrokerTimestamp() > timeAfterPublishTime ? 1 : 0) != 0);
        }
        result.reset();
        cursorNew.close();
        ledgerNew.close();
        this.factory.shutdown();
    }

    public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
        HashSet<String> interceptorNames = new HashSet<String>();
        interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
        interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
        return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, (ClassLoader)Thread.currentThread().getContextClassLoader());
    }

    @Test
    void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
        String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers";
        int entriesPerLedger = 2;
        int totalEntries = 10;
        boolean ttlSeconds = true;
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(10L);
        config.setMaxEntriesPerLedger(2);
        config.setRetentionTime(1, TimeUnit.HOURS);
        config.setAutoSkipNonRecoverableData(true);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testPersistentMessageExpiryWithNonRecoverableLedgers", config);
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("testPersistentMessageExpiryWithNonRecoverableLedgers");
        for (int i = 0; i < 10; ++i) {
            ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("msg" + i));
        }
        List ledgers = ledger.getLedgersInfoAsList();
        MLDataFormats.ManagedLedgerInfo.LedgerInfo lastLedgerInfo = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledgers.get(ledgers.size() - 1);
        Assert.assertEquals((int)ledgers.size(), (int)5);
        Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
        this.bkc.deleteLedger(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledgers.get(0)).getLedgerId());
        this.bkc.deleteLedger(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledgers.get(1)).getLedgerId());
        this.bkc.deleteLedger(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledgers.get(2)).getLedgerId());
        PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), (ManagedCursor)c1, null);
        Position previousMarkDelete = null;
        for (int i = 0; i < 10; ++i) {
            monitor.expireMessages(1);
            Position previousPos = previousMarkDelete;
            MockedPulsarServiceBaseTest.retryStrategically(test -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos), 5, 100L);
            previousMarkDelete = c1.getMarkDeletedPosition();
        }
        PositionImpl markDeletePosition = (PositionImpl)c1.getMarkDeletedPosition();
        Assert.assertEquals((long)lastLedgerInfo.getLedgerId(), (long)markDeletePosition.getLedgerId());
        Assert.assertEquals((long)(lastLedgerInfo.getEntries() - 1L), (long)markDeletePosition.getEntryId());
        c1.close();
        ledger.close();
        this.factory.shutdown();
    }

    @Test
    void testMessageExpiryWithPosition() throws Exception {
        String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
        int entriesPerLedger = 5;
        int totalEntries = 30;
        ArrayList<Position> positions = new ArrayList<Position>();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(10L);
        config.setMaxEntriesPerLedger(5);
        config.setRetentionTime(1, TimeUnit.HOURS);
        config.setAutoSkipNonRecoverableData(true);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testPersistentMessageExpiryWithPositionNonRecoverableLedgers", config);
        ManagedCursorImpl cursor = (ManagedCursorImpl)ledger.openCursor("testPersistentMessageExpiryWithPositionNonRecoverableLedgers");
        PersistentSubscription subscription = (PersistentSubscription)PowerMockito.mock(PersistentSubscription.class);
        Topic topic = (Topic)PowerMockito.mock(Topic.class);
        PowerMockito.when((Object)subscription.getTopic()).thenReturn((Object)topic);
        for (int i = 0; i < 30; ++i) {
            positions.add(ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("msg" + i)));
        }
        PowerMockito.when((Object)topic.getLastPosition()).thenReturn((Object)((Position)positions.get(positions.size() - 1)));
        PersistentMessageExpiryMonitor monitor = (PersistentMessageExpiryMonitor)PowerMockito.spy((Object)new PersistentMessageExpiryMonitor("topicname", cursor.getName(), (ManagedCursor)cursor, subscription));
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)PositionImpl.get((long)((Position)positions.get(0)).getLedgerId(), (long)-1L));
        boolean issued = monitor.expireMessages((Position)positions.get(15));
        Awaitility.await().untilAsserted(() -> ((PersistentMessageExpiryMonitor)Mockito.verify((Object)monitor, (VerificationMode)Mockito.times((int)1))).findEntryComplete((Position)ArgumentMatchers.any(), ArgumentMatchers.any()));
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)PositionImpl.get((long)((Position)positions.get(15)).getLedgerId(), (long)((Position)positions.get(15)).getEntryId()));
        Assert.assertTrue((boolean)issued);
        Mockito.clearInvocations((Object[])new PersistentMessageExpiryMonitor[]{monitor});
        issued = monitor.expireMessages((Position)PositionImpl.get((long)100L, (long)100L));
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)PositionImpl.get((long)((Position)positions.get(15)).getLedgerId(), (long)((Position)positions.get(15)).getEntryId()));
        Assert.assertFalse((boolean)issued);
        issued = monitor.expireMessages((Position)positions.get(15));
        Awaitility.await().untilAsserted(() -> ((PersistentMessageExpiryMonitor)Mockito.verify((Object)monitor, (VerificationMode)Mockito.times((int)1))).findEntryComplete((Position)ArgumentMatchers.any(), ArgumentMatchers.any()));
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)PositionImpl.get((long)((Position)positions.get(15)).getLedgerId(), (long)((Position)positions.get(15)).getEntryId()));
        Assert.assertTrue((boolean)issued);
        Mockito.clearInvocations((Object[])new PersistentMessageExpiryMonitor[]{monitor});
        issued = monitor.expireMessages((Position)positions.get(10));
        Awaitility.await().untilAsserted(() -> ((PersistentMessageExpiryMonitor)Mockito.verify((Object)monitor, (VerificationMode)Mockito.times((int)1))).findEntryComplete((Position)ArgumentMatchers.any(), ArgumentMatchers.any()));
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)PositionImpl.get((long)((Position)positions.get(15)).getLedgerId(), (long)((Position)positions.get(15)).getEntryId()));
        Assert.assertTrue((boolean)issued);
        Mockito.clearInvocations((Object[])new PersistentMessageExpiryMonitor[]{monitor});
        issued = monitor.expireMessages((Position)positions.get(16));
        Awaitility.await().untilAsserted(() -> ((PersistentMessageExpiryMonitor)Mockito.verify((Object)monitor, (VerificationMode)Mockito.times((int)1))).findEntryComplete((Position)ArgumentMatchers.any(), ArgumentMatchers.any()));
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)PositionImpl.get((long)((Position)positions.get(16)).getLedgerId(), (long)((Position)positions.get(16)).getEntryId()));
        Assert.assertTrue((boolean)issued);
        Mockito.clearInvocations((Object[])new PersistentMessageExpiryMonitor[]{monitor});
        ManagedCursorImpl mockCursor = (ManagedCursorImpl)PowerMockito.mock(ManagedCursorImpl.class);
        PersistentMessageExpiryMonitor mockMonitor = (PersistentMessageExpiryMonitor)PowerMockito.spy((Object)new PersistentMessageExpiryMonitor("topicname", cursor.getName(), (ManagedCursor)mockCursor, subscription));
        ((ManagedCursorImpl)PowerMockito.doAnswer(invocation -> null).when((Object)mockCursor)).asyncFindNewestMatching((ManagedCursor.FindPositionConstraint)ArgumentMatchers.any(), (Predicate)ArgumentMatchers.any(), (AsyncCallbacks.FindEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        issued = mockMonitor.expireMessages((Position)positions.get(15));
        Assert.assertTrue((boolean)issued);
        issued = mockMonitor.expireMessages((Position)positions.get(15));
        Assert.assertFalse((boolean)issued);
        cursor.close();
        ledger.close();
        this.factory.shutdown();
    }

    @Test
    public void test() {
        ResetCursorData resetCursorData = new ResetCursorData(1L, 1L);
        resetCursorData.setExcluded(true);
        System.out.println(Entity.entity((Object)resetCursorData, (String)"application/json"));
    }

    static class Result {
        ManagedLedgerException exception = null;
        Position position = null;

        Result() {
        }

        void reset() {
            this.exception = null;
            this.position = null;
        }
    }
}

