/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.client.impl.RawReaderTest;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.compaction.CompactionRecord;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-compaction"})
public class CompactorTest
extends MockedPulsarServiceBaseTest {
    private ScheduledExecutorService compactionScheduler;

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"use"})));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
        this.compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
        this.compactionScheduler.shutdownNow();
    }

    private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception {
        BookKeeper bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, bk, this.compactionScheduler);
        long compactedLedgerId = (Long)compactor.compact(topic).get();
        LedgerHandle ledger = bk.openLedger(compactedLedgerId, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        Assert.assertEquals((long)(ledger.getLastAddConfirmed() + 1L), (long)expected.size(), (String)"Should have as many entries as there is keys");
        ArrayList<String> keys = new ArrayList<String>();
        Enumeration entries = ledger.readEntries(0L, ledger.getLastAddConfirmed());
        while (entries.hasMoreElements()) {
            ByteBuf buf = ((LedgerEntry)entries.nextElement()).getEntryBuffer();
            RawMessage m = RawMessageImpl.deserializeFrom((ByteBuf)buf);
            String key = RawReaderTest.extractKey(m);
            keys.add(key);
            ByteBuf payload = this.extractPayload(m);
            byte[] bytes = new byte[payload.readableBytes()];
            payload.readBytes(bytes);
            Assert.assertEquals((byte[])bytes, (byte[])expected.remove(key), (String)"Compacted version should match expected version");
            m.close();
        }
        if (checkMetrics) {
            CompactionRecord compactionRecord = (CompactionRecord)compactor.getStats().getCompactionRecordForTopic(topic).get();
            long compactedTopicRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
            long lastCompactSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
            long lastCompactFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
            long lastCompactDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills();
            Assert.assertTrue((compactedTopicRemovedEventCount >= 1L ? 1 : 0) != 0);
            Assert.assertTrue((lastCompactSucceedTimestamp >= 1L ? 1 : 0) != 0);
            Assert.assertTrue((lastCompactDurationTimeInMills >= 0L ? 1 : 0) != 0);
            Assert.assertEquals((long)lastCompactFailedTimestamp, (long)0L);
        }
        Assert.assertTrue((boolean)expected.isEmpty(), (String)"All expected keys should have been found");
        return keys;
    }

    @Test
    public void testCompaction() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        int numMessages = 1000;
        int maxKeys = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        HashMap<String, byte[]> expected = new HashMap<String, byte[]>();
        Random r = new Random(0L);
        for (int j = 0; j < 1000; ++j) {
            int keyIndex = r.nextInt(10);
            String key = "key" + keyIndex;
            byte[] data = ("my-message-" + key + "-" + j).getBytes();
            producer.newMessage().key(key).value((Object)data).send();
            expected.put(key, data);
        }
        this.compactAndVerify(topic, expected, true);
    }

    @Test
    public void testCompactAddCompact() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        HashMap<String, byte[]> expected = new HashMap<String, byte[]>();
        producer.newMessage().key("a").value((Object)"A_1".getBytes()).send();
        producer.newMessage().key("b").value((Object)"B_1".getBytes()).send();
        producer.newMessage().key("a").value((Object)"A_2".getBytes()).send();
        expected.put("a", "A_2".getBytes());
        expected.put("b", "B_1".getBytes());
        this.compactAndVerify(topic, new HashMap<String, byte[]>(expected), false);
        producer.newMessage().key("b").value((Object)"B_2".getBytes()).send();
        expected.put("b", "B_2".getBytes());
        this.compactAndVerify(topic, expected, false);
    }

    @Test
    public void testCompactedInOrder() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        producer.newMessage().key("c").value((Object)"C_1".getBytes()).send();
        producer.newMessage().key("a").value((Object)"A_1".getBytes()).send();
        producer.newMessage().key("b").value((Object)"B_1".getBytes()).send();
        producer.newMessage().key("a").value((Object)"A_2".getBytes()).send();
        HashMap<String, byte[]> expected = new HashMap<String, byte[]>();
        expected.put("a", "A_2".getBytes());
        expected.put("b", "B_1".getBytes());
        expected.put("c", "C_1".getBytes());
        List<String> keyOrder = this.compactAndVerify(topic, expected, false);
        Assert.assertEquals(keyOrder, (Collection)Lists.newArrayList((Object[])new String[]{"c", "b", "a"}));
    }

    @Test
    public void testCompactEmptyTopic() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe().close();
        BookKeeper bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, bk, this.compactionScheduler);
        compactor.compact(topic).get();
    }

    @Test
    public void testPhaseOneLoopTimeConfiguration() {
        ServiceConfiguration configuration = new ServiceConfiguration();
        configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60L);
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, (PulsarClient)Mockito.mock(PulsarClientImpl.class), (BookKeeper)Mockito.mock(BookKeeper.class), this.compactionScheduler);
        Assert.assertEquals((long)compactor.getPhaseOneLoopReadTimeoutInSeconds(), (long)60L);
    }

    public ByteBuf extractPayload(RawMessage m) throws Exception {
        ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
        Commands.skipChecksumIfPresent((ByteBuf)payloadAndMetadata);
        int metadataSize = payloadAndMetadata.readInt();
        byte[] metadata = new byte[metadataSize];
        payloadAndMetadata.readBytes(metadata);
        return payloadAndMetadata.slice();
    }
}

