/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.index.schema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.index.internal.gbptree.RawBytes;
import org.neo4j.index.internal.gbptree.SimpleByteArrayLayout;
import org.neo4j.kernel.api.index.IndexSample;
import org.neo4j.kernel.impl.index.schema.BlockEntry;
import org.neo4j.kernel.impl.index.schema.BlockEntryCursor;
import org.neo4j.kernel.impl.index.schema.BlockEntryMergerTestUtils;
import org.neo4j.kernel.impl.index.schema.BlockEntryStreamMerger;
import org.neo4j.kernel.impl.index.schema.BlockStorage;
import org.neo4j.test.RandomSupport;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.OtherThread;
import org.neo4j.test.extension.OtherThreadExtension;
import org.neo4j.test.extension.RandomExtension;

@ExtendWith(value={RandomExtension.class, OtherThreadExtension.class})
class BlockEntryStreamMergerTest {
    private static final int QUEUE_SIZE = 5;
    private static final int BATCH_SIZE = 10;
    @Inject
    private RandomSupport random;
    @Inject
    private OtherThread t2;
    private final Layout<RawBytes, RawBytes> layout = new SimpleByteArrayLayout();
    private final List<BlockEntry<RawBytes, RawBytes>> allData = new ArrayList<BlockEntry<RawBytes, RawBytes>>();

    BlockEntryStreamMergerTest() {
    }

    @Test
    void shouldMergePartsIntoOneWithoutSampling() throws Exception {
        List<BlockEntryCursor<RawBytes, RawBytes>> parts = BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData);
        try (BlockEntryStreamMerger merger = new BlockEntryStreamMerger(parts, this.layout, null, BlockStorage.Cancellation.NOT_CANCELLABLE, 10, 5);){
            this.t2.execute((Callable)merger);
            BlockEntryMergerTestUtils.assertMergedPartStream(this.allData, merger);
        }
    }

    @Test
    void shouldMergePartsIntoOneWithSampling() throws Exception {
        List<BlockEntryCursor<RawBytes, RawBytes>> parts = BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData);
        try (BlockEntryStreamMerger merger = new BlockEntryStreamMerger(parts, this.layout, this.layout, BlockStorage.Cancellation.NOT_CANCELLABLE, 10, 5);){
            Future t2Future = this.t2.execute((Callable)merger);
            BlockEntryMergerTestUtils.assertMergedPartStream(this.allData, merger);
            t2Future.get();
            IndexSample sample = merger.buildIndexSample();
            Assertions.assertThat((long)sample.sampleSize()).isEqualTo((long)this.allData.size());
            Assertions.assertThat((long)sample.indexSize()).isEqualTo((long)this.allData.size());
            Assertions.assertThat((long)sample.uniqueValues()).isEqualTo(this.countUniqueKeys(this.allData));
        }
    }

    @Test
    void shouldStopMergingWhenHalted() throws Exception {
        List<BlockEntryCursor<RawBytes, RawBytes>> parts = BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData, 4, rng -> 50);
        try (BlockEntryStreamMerger merger = new BlockEntryStreamMerger(parts, this.layout, null, BlockStorage.Cancellation.NOT_CANCELLABLE, 10, 5);){
            Future invocation = this.t2.execute((Callable)merger);
            this.t2.get().waitUntilWaiting(wait -> wait.isAt(BlockEntryStreamMerger.class, "call"));
            merger.halt();
            invocation.get();
            Assertions.assertThat((int)BlockEntryStreamMergerTest.countEntries((BlockEntryStreamMerger<RawBytes, RawBytes>)merger)).isEqualTo(50);
        }
    }

    @Test
    void shouldStopMergingWhenCancelled() throws Exception {
        List<BlockEntryCursor<RawBytes, RawBytes>> parts = BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData, 4, rng -> 50);
        AtomicBoolean cancelled = new AtomicBoolean();
        try (BlockEntryStreamMerger merger = new BlockEntryStreamMerger(parts, this.layout, null, cancelled::get, 10, 5);){
            Future invocation = this.t2.execute((Callable)merger);
            this.t2.get().waitUntilWaiting(wait -> wait.isAt(BlockEntryStreamMerger.class, "call"));
            cancelled.set(true);
            invocation.get();
            Assertions.assertThat((int)BlockEntryStreamMergerTest.countEntries((BlockEntryStreamMerger<RawBytes, RawBytes>)merger)).isEqualTo(50);
        }
    }

    @Test
    void shouldStopReaderFromAwaitingMoreBatchesWhenHalted() throws Exception {
        List<BlockEntryCursor<RawBytes, RawBytes>> parts = BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData, 4, rng -> 50);
        try (BlockEntryStreamMerger merger = new BlockEntryStreamMerger(parts, this.layout, null, BlockStorage.Cancellation.NOT_CANCELLABLE, 10, 5);){
            Future firstRead = this.t2.execute(() -> ((BlockEntryStreamMerger)merger).next());
            this.t2.get().waitUntilWaiting(wait -> wait.isAt(BlockEntryStreamMerger.class, "next"));
            merger.halt();
            Assertions.assertThat((Boolean)((Boolean)firstRead.get())).isFalse();
        }
    }

    @Test
    void shouldStopReaderFromAwaitingMoreBatchesWhenCancelled() throws Exception {
        List<BlockEntryCursor<RawBytes, RawBytes>> parts = BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData, 4, rng -> 50);
        try (BlockEntryStreamMerger merger = new BlockEntryStreamMerger(parts, this.layout, null, BlockStorage.Cancellation.NOT_CANCELLABLE, 10, 5);){
            Future firstRead = this.t2.execute(() -> ((BlockEntryStreamMerger)merger).next());
            this.t2.get().waitUntilWaiting(wait -> wait.isAt(BlockEntryStreamMerger.class, "next"));
            merger.halt();
            Assertions.assertThat((Boolean)((Boolean)firstRead.get())).isFalse();
        }
    }

    private static int countEntries(BlockEntryStreamMerger<RawBytes, RawBytes> merger) throws IOException {
        int numMergedEntries = 0;
        while (merger.next()) {
            ++numMergedEntries;
        }
        return numMergedEntries;
    }

    private long countUniqueKeys(List<BlockEntry<RawBytes, RawBytes>> entries) {
        TreeSet set = new TreeSet(this.layout);
        entries.forEach(e -> set.add((RawBytes)e.key()));
        return set.size();
    }
}

