/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.intercept;

import com.google.common.base.Predicate;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class MangedLedgerInterceptorImplTest
extends MockedBookKeeperTestCase {
    private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class);

    @Test
    public void testAddBrokerEntryMetadata() throws Exception {
        int MOCK_BATCH_SIZE = 2;
        int numberOfEntries = 10;
        String ledgerAndCursorName = "topicEntryMetadataSequenceId";
        ManagedLedgerInterceptorImpl interceptor = new ManagedLedgerInterceptorImpl(MangedLedgerInterceptorImplTest.getBrokerEntryMetadataInterceptors());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(2);
        config.setManagedLedgerInterceptor((ManagedLedgerInterceptor)interceptor);
        ManagedLedger ledger = this.factory.open("topicEntryMetadataSequenceId", config);
        ManagedCursorImpl cursor = (ManagedCursorImpl)ledger.openCursor("topicEntryMetadataSequenceId");
        for (int i = 0; i < numberOfEntries; ++i) {
            ledger.addEntry(("message" + i).getBytes(), 2);
        }
        Assert.assertEquals((long)19L, (long)((ManagedLedgerInterceptorImpl)ledger.getManagedLedgerInterceptor()).getIndex());
        List entryList = cursor.readEntries(numberOfEntries);
        for (int i = 0; i < numberOfEntries; ++i) {
            BrokerEntryMetadata metadata = Commands.parseBrokerEntryMetadataIfExist((ByteBuf)((Entry)entryList.get(i)).getDataBuffer());
            Assert.assertNotNull((Object)metadata);
            Assert.assertEquals((long)metadata.getIndex(), (long)((i + 1) * 2 - 1));
        }
        cursor.close();
        ledger.close();
        this.factory.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testRecoveryIndex() throws Exception {
        int MOCK_BATCH_SIZE = 2;
        ManagedLedgerInterceptorImpl interceptor = new ManagedLedgerInterceptorImpl(MangedLedgerInterceptorImplTest.getBrokerEntryMetadataInterceptors());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setManagedLedgerInterceptor((ManagedLedgerInterceptor)interceptor);
        ManagedLedger ledger = this.factory.open("my_recovery_index_test_ledger", config);
        ledger.addEntry("dummy-entry-1".getBytes(StandardCharsets.UTF_8), 2);
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-2".getBytes(StandardCharsets.UTF_8), 2);
        Assert.assertEquals((long)((ManagedLedgerInterceptorImpl)ledger.getManagedLedgerInterceptor()).getIndex(), (long)3L);
        ledger.close();
        log.info("Closing ledger and reopening");
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((MetadataStore)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ledger = factory2.open("my_recovery_index_test_ledger", config);
            cursor = ledger.openCursor("c1");
            Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)2L);
            Assert.assertEquals((long)((ManagedLedgerInterceptorImpl)ledger.getManagedLedgerInterceptor()).getIndex(), (long)3L);
            List entries = cursor.readEntries(100);
            Assert.assertEquals((int)entries.size(), (int)1);
            entries.forEach(e -> e.release());
            cursor.close();
            ledger.close();
        }
        finally {
            if (Collections.singletonList(factory2).get(0) != null) {
                factory2.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFindPositionByIndex() throws Exception {
        int MOCK_BATCH_SIZE = 2;
        int maxEntriesPerLedger = 5;
        int maxSequenceIdPerLedger = 10;
        ManagedLedgerInterceptorImpl interceptor = new ManagedLedgerInterceptorImpl(MangedLedgerInterceptorImplTest.getBrokerEntryMetadataInterceptors());
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setManagedLedgerInterceptor((ManagedLedgerInterceptor)interceptor);
        managedLedgerConfig.setMaxEntriesPerLedger(5);
        ManagedLedger ledger = this.factory.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig);
        ManagedCursor cursor = ledger.openCursor("c1");
        long firstLedgerId = -1L;
        for (int i = 0; i < 5; ++i) {
            firstLedgerId = ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), 2).getLedgerId();
        }
        Assert.assertEquals((long)((ManagedLedgerInterceptorImpl)ledger.getManagedLedgerInterceptor()).getIndex(), (long)9L);
        PositionImpl position = null;
        int index = 0;
        while ((long)index <= ((ManagedLedgerInterceptorImpl)ledger.getManagedLedgerInterceptor()).getIndex()) {
            position = (PositionImpl)ledger.asyncFindPosition((Predicate)new IndexSearchPredicate(index)).get();
            Assert.assertEquals((long)position.getEntryId(), (long)(index % maxSequenceIdPerLedger / 2));
            ++index;
        }
        long secondLedgerId = -1L;
        for (int i = 0; i < 5; ++i) {
            secondLedgerId = ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), 2).getLedgerId();
        }
        Assert.assertEquals((long)((ManagedLedgerInterceptorImpl)ledger.getManagedLedgerInterceptor()).getIndex(), (long)19L);
        Assert.assertNotEquals((Object)firstLedgerId, (Object)secondLedgerId);
        int index2 = 0;
        while ((long)index2 <= ((ManagedLedgerInterceptorImpl)ledger.getManagedLedgerInterceptor()).getIndex()) {
            position = (PositionImpl)ledger.asyncFindPosition((Predicate)new IndexSearchPredicate(index2)).get();
            Assert.assertEquals((long)position.getEntryId(), (long)(index2 % maxSequenceIdPerLedger / 2));
            ++index2;
        }
        ledger.close();
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((MetadataStore)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ledger = factory2.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig);
            long thirdLedgerId = -1L;
            for (int i = 0; i < 5; ++i) {
                thirdLedgerId = ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), 2).getLedgerId();
            }
            Assert.assertEquals((long)((ManagedLedgerInterceptorImpl)ledger.getManagedLedgerInterceptor()).getIndex(), (long)29L);
            Assert.assertNotEquals((Object)secondLedgerId, (Object)thirdLedgerId);
            int index3 = 0;
            while ((long)index3 <= ((ManagedLedgerInterceptorImpl)ledger.getManagedLedgerInterceptor()).getIndex()) {
                position = (PositionImpl)ledger.asyncFindPosition((Predicate)new IndexSearchPredicate(index3)).get();
                Assert.assertEquals((long)position.getEntryId(), (long)(index3 % maxSequenceIdPerLedger / 2));
                ++index3;
            }
            cursor.close();
            ledger.close();
        }
        finally {
            if (Collections.singletonList(factory2).get(0) != null) {
                factory2.shutdown();
            }
        }
    }

    public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
        HashSet<String> interceptorNames = new HashSet<String>();
        interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
        interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
        return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, (ClassLoader)Thread.currentThread().getContextClassLoader());
    }

    static class IndexSearchPredicate
    implements Predicate<Entry> {
        long indexToSearch = -1L;

        public IndexSearchPredicate(long indexToSearch) {
            this.indexToSearch = indexToSearch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean apply(@Nullable Entry entry) {
            try {
                BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist((ByteBuf)entry.getDataBuffer());
                boolean bl = brokerEntryMetadata.getIndex() < this.indexToSearch;
                return bl;
            }
            catch (Exception e) {
                log.error("Error deserialize message for message position find", (Throwable)e);
            }
            finally {
                entry.release();
            }
            return false;
        }
    }
}

