/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.index.schema;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration;
import org.neo4j.graphdb.config.Configuration;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.internal.kernel.api.IndexOrder;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.internal.kernel.api.schema.IndexProviderDescriptor;
import org.neo4j.internal.kernel.api.schema.SchemaDescriptor;
import org.neo4j.internal.kernel.api.schema.SchemaDescriptorSupplier;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageSwapperFactory;
import org.neo4j.io.pagecache.impl.SingleFilePageSwapperFactory;
import org.neo4j.io.pagecache.impl.muninn.MuninnPageCache;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier;
import org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexDirectoryStructure;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.schema.SchemaDescriptorFactory;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.index.schema.GenericKey;
import org.neo4j.kernel.impl.index.schema.GenericLayout;
import org.neo4j.kernel.impl.index.schema.GenericNativeIndexPopulator;
import org.neo4j.kernel.impl.index.schema.GenericNativeIndexProvider;
import org.neo4j.kernel.impl.index.schema.IndexLayout;
import org.neo4j.kernel.impl.index.schema.NativeIndexPopulator;
import org.neo4j.kernel.impl.index.schema.NativeIndexPopulatorPartSupplier;
import org.neo4j.kernel.impl.index.schema.NativeIndexReader;
import org.neo4j.kernel.impl.index.schema.NativeIndexValue;
import org.neo4j.kernel.impl.index.schema.NodeValueIterator;
import org.neo4j.kernel.impl.index.schema.ParallelNativeIndexPopulator;
import org.neo4j.kernel.impl.index.schema.config.ConfiguredSpaceFillingCurveSettingsCache;
import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache;
import org.neo4j.kernel.impl.scheduler.JobSchedulerFactory;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.NodePropertyAccessor;
import org.neo4j.storageengine.api.schema.IndexDescriptorFactory;
import org.neo4j.storageengine.api.schema.IndexProgressor;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.TestDirectoryExtension;
import org.neo4j.test.rule.TestDirectory;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.Values;

@ExtendWith(value={TestDirectoryExtension.class})
class ParallelNativeIndexPopulatorTest {
    private static final int THREADS = 4;
    private static final StoreIndexDescriptor DESCRIPTOR = IndexDescriptorFactory.forSchema((SchemaDescriptor)SchemaDescriptorFactory.forLabel((int)1, (int[])new int[]{1}), (IndexProviderDescriptor)GenericNativeIndexProvider.DESCRIPTOR).withId(1L);
    @Inject
    TestDirectory directory;
    private final Config defaults = Config.defaults();
    private final ConfiguredSpaceFillingCurveSettingsCache settingsCache = new ConfiguredSpaceFillingCurveSettingsCache(this.defaults);
    private final IndexSpecificSpaceFillingCurveSettingsCache spatialSettings = new IndexSpecificSpaceFillingCurveSettingsCache(this.settingsCache, new HashMap());
    private final GenericLayout layout = new GenericLayout(1, this.spatialSettings);
    private final AtomicLong next = new AtomicLong();
    private ExecutorService executorService;
    private File baseIndexFile;

    ParallelNativeIndexPopulatorTest() {
    }

    @BeforeEach
    void startExecutor() {
        this.executorService = Executors.newFixedThreadPool(4);
        this.baseIndexFile = this.directory.file("index");
    }

    @AfterEach
    void stopExecutor() {
        this.executorService.shutdown();
    }

    @Test
    void shouldCreateThreadLocalParts() throws ExecutionException, InterruptedException, IndexEntryConflictException {
        Thread mainThread = Thread.currentThread();
        ConcurrentHashMap<Thread, NativeIndexPopulator> partPopulators = new ConcurrentHashMap<Thread, NativeIndexPopulator>();
        ParallelNativeIndexPopulator populator = new ParallelNativeIndexPopulator(this.baseIndexFile, (IndexLayout)this.layout, this.mockPartSupplier(partPopulators, this::mockNativeIndexPopulator));
        int batchCountPerThread = 10;
        this.applyBatchesInParallel((ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>)populator, batchCountPerThread);
        Assertions.assertEquals((int)4, (int)partPopulators.size());
        for (Thread thread : partPopulators.keySet()) {
            if (thread == mainThread) continue;
            NativeIndexPopulator partPopulator = (NativeIndexPopulator)partPopulators.get(thread);
            ((NativeIndexPopulator)Mockito.verify((Object)partPopulator, (VerificationMode)Mockito.times((int)batchCountPerThread))).add(ArgumentMatchers.anyCollection());
        }
    }

    @Test
    void shouldApplyUpdatesOnEachPart() throws ExecutionException, InterruptedException, IndexEntryConflictException {
        Thread mainThread = Thread.currentThread();
        ConcurrentHashMap<Thread, NativeIndexPopulator> partPopulators = new ConcurrentHashMap<Thread, NativeIndexPopulator>();
        ParallelNativeIndexPopulator populator = new ParallelNativeIndexPopulator(this.baseIndexFile, (IndexLayout)this.layout, this.mockPartSupplier(partPopulators, this::mockNativeIndexPopulator));
        int batchCountPerThread = 10;
        this.applyBatchesInParallel((ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>)populator, batchCountPerThread);
        this.applyUpdates((ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>)populator, this.next);
        this.applyBatchesInParallel((ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>)populator, batchCountPerThread);
        this.applyUpdates((ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>)populator, this.next);
        this.applyBatchesInParallel((ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>)populator, batchCountPerThread);
        Assertions.assertEquals((int)4, (int)partPopulators.size());
        for (Thread thread : partPopulators.keySet()) {
            if (thread == mainThread) continue;
            NativeIndexPopulator partPopulator = (NativeIndexPopulator)partPopulators.get(thread);
            ((NativeIndexPopulator)Mockito.verify((Object)partPopulator, (VerificationMode)Mockito.times((int)(batchCountPerThread * 3)))).add(ArgumentMatchers.anyCollection());
            CountingIndexUpdater updater = (CountingIndexUpdater)partPopulator.newPopulatingUpdater();
            Assertions.assertEquals((int)10, (int)updater.count);
        }
    }

    @Test
    void shouldDropAllPartsOnClose() throws ExecutionException, InterruptedException {
        this.shouldDropAllParts(populator -> populator.close(false));
    }

    @Test
    void shouldDropAllPartsOnDrop() throws ExecutionException, InterruptedException {
        this.shouldDropAllParts(ParallelNativeIndexPopulator::drop);
    }

    private void shouldDropAllParts(Consumer<ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>> method) throws ExecutionException, InterruptedException {
        ConcurrentHashMap<Thread, NativeIndexPopulator> partPopulators = new ConcurrentHashMap<Thread, NativeIndexPopulator>();
        ParallelNativeIndexPopulator populator = new ParallelNativeIndexPopulator(this.baseIndexFile, (IndexLayout)this.layout, this.mockPartSupplier(partPopulators, this::failOnDropNativeIndexPopulator));
        populator.create();
        this.applyBatchesInParallel((ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>)populator, 1);
        try {
            method.accept((ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>)populator);
            Assert.fail((String)"Should have failed");
        }
        catch (CustomFailure customFailure) {
            // empty catch block
        }
        for (NativeIndexPopulator part : partPopulators.values()) {
            ((NativeIndexPopulator)Mockito.verify((Object)part)).drop();
        }
    }

    @Test
    void shouldMergePartsOnAccessingVerifyDeferredConstraintsAfterPopulation() throws Exception {
        this.shouldMergePartsOnAccessingFirstCompleteMethodAfterPopulation((ThrowingConsumer<ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>, IndexEntryConflictException>)((ThrowingConsumer)populator -> populator.verifyDeferredConstraints((NodePropertyAccessor)Mockito.mock(NodePropertyAccessor.class))));
    }

    @Test
    void shouldMergePartsOnAccessingSampleResultAfterPopulation() throws Exception {
        this.shouldMergePartsOnAccessingFirstCompleteMethodAfterPopulation((ThrowingConsumer<ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>, IndexEntryConflictException>)((ThrowingConsumer)ParallelNativeIndexPopulator::sampleResult));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shouldMergePartsOnAccessingFirstCompleteMethodAfterPopulation(ThrowingConsumer<ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>, IndexEntryConflictException> method) throws Exception {
        try (EphemeralFileSystemAbstraction fs = new EphemeralFileSystemAbstraction();
             JobScheduler jobScheduler = JobSchedulerFactory.createInitialisedScheduler();){
            SingleFilePageSwapperFactory swapper = new SingleFilePageSwapperFactory();
            swapper.open((FileSystemAbstraction)fs, (Configuration)this.defaults);
            try (MuninnPageCache pageCache = new MuninnPageCache((PageSwapperFactory)swapper, 1000, PageCacheTracer.NULL, PageCursorTracerSupplier.NULL, EmptyVersionContextSupplier.EMPTY, jobScheduler);){
                NativeIndexPopulatorPartSupplier partSupplier = arg_0 -> this.lambda$shouldMergePartsOnAccessingFirstCompleteMethodAfterPopulation$2((PageCache)pageCache, fs, arg_0);
                ParallelNativeIndexPopulator populator = new ParallelNativeIndexPopulator(this.baseIndexFile, (IndexLayout)this.layout, partSupplier);
                try {
                    populator.create();
                    this.applyBatchesInParallel((ParallelNativeIndexPopulator<GenericKey, NativeIndexValue>)populator, 100);
                    method.accept((Object)populator);
                    NodeValueIterator results = new NodeValueIterator();
                    try (NativeIndexReader reader = populator.newReader();){
                        reader.query((IndexProgressor.NodeValueClient)results, IndexOrder.NONE, true, new IndexQuery[]{IndexQuery.exists((int)1)});
                        long nextExpectedId = 0L;
                        while (results.hasNext()) {
                            long id = results.next();
                            Assertions.assertEquals((long)nextExpectedId++, (long)id);
                        }
                        Assertions.assertEquals((long)nextExpectedId, (long)this.next.get());
                    }
                }
                finally {
                    populator.close(true);
                }
            }
        }
    }

    private NativeIndexPopulatorPartSupplier<GenericKey, NativeIndexValue> mockPartSupplier(ConcurrentMap<Thread, NativeIndexPopulator> partPopulators, Supplier<NativeIndexPopulator<GenericKey, NativeIndexValue>> partSupplier) {
        return file -> {
            NativeIndexPopulator part = (NativeIndexPopulator)partSupplier.get();
            if (!file.equals(this.baseIndexFile)) {
                Assertions.assertNull((Object)partPopulators.put(Thread.currentThread(), part));
            }
            return part;
        };
    }

    private NativeIndexPopulator<GenericKey, NativeIndexValue> mockNativeIndexPopulator() {
        NativeIndexPopulator populator = (NativeIndexPopulator)Mockito.mock(NativeIndexPopulator.class);
        Mockito.when((Object)populator.newPopulatingUpdater()).thenReturn((Object)new CountingIndexUpdater());
        return populator;
    }

    private NativeIndexPopulator<GenericKey, NativeIndexValue> failOnDropNativeIndexPopulator() {
        NativeIndexPopulator<GenericKey, NativeIndexValue> populator = this.mockNativeIndexPopulator();
        ((NativeIndexPopulator)Mockito.doThrow(CustomFailure.class).when(populator)).drop();
        return populator;
    }

    private void applyUpdates(ParallelNativeIndexPopulator<GenericKey, NativeIndexValue> populator, AtomicLong next) throws IndexEntryConflictException {
        try (IndexUpdater updater = populator.newPopulatingUpdater((NodePropertyAccessor)Mockito.mock(NodePropertyAccessor.class));){
            for (int i = 0; i < 5; ++i) {
                updater.process(this.update(next.incrementAndGet()));
            }
        }
    }

    private void applyBatchesInParallel(ParallelNativeIndexPopulator<GenericKey, NativeIndexValue> populator, int batchCountPerThread) throws ExecutionException, InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(4);
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        for (int i = 0; i < 4; ++i) {
            futures.add(this.executorService.submit(() -> {
                startSignal.countDown();
                startSignal.await();
                for (int j = 0; j < batchCountPerThread; ++j) {
                    populator.add(Arrays.asList(this.update(this.next.getAndIncrement())));
                }
                return null;
            }));
        }
        for (Future future : futures) {
            future.get();
        }
    }

    private IndexEntryUpdate<?> update(long id) {
        return IndexEntryUpdate.add((long)id, (SchemaDescriptorSupplier)DESCRIPTOR, (Value[])new Value[]{Values.longValue((long)id)});
    }

    private /* synthetic */ NativeIndexPopulator lambda$shouldMergePartsOnAccessingFirstCompleteMethodAfterPopulation$2(PageCache pageCache, EphemeralFileSystemAbstraction fs, File file) {
        return new GenericNativeIndexPopulator(pageCache, (FileSystemAbstraction)fs, file, (IndexLayout)this.layout, IndexProvider.Monitor.EMPTY, DESCRIPTOR, this.spatialSettings, IndexDirectoryStructure.directoriesByProvider((File)this.directory.directory()).forProvider(GenericNativeIndexProvider.DESCRIPTOR), (SpaceFillingCurveConfiguration)Mockito.mock(SpaceFillingCurveConfiguration.class), false, !file.equals(this.baseIndexFile));
    }

    private static class CountingIndexUpdater
    implements IndexUpdater {
        private int count;

        private CountingIndexUpdater() {
        }

        public void process(IndexEntryUpdate<?> update) {
            ++this.count;
        }

        public void close() {
        }
    }

    private static class CustomFailure
    extends RuntimeException {
        private CustomFailure() {
        }
    }
}

