/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.store.id;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.neo4j.function.Consumer;
import org.neo4j.function.Predicate;
import org.neo4j.function.Predicates;
import org.neo4j.function.Supplier;
import org.neo4j.function.Suppliers;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.impl.store.id.DelayedBuffer;
import org.neo4j.test.Race;
import org.neo4j.unsafe.impl.batchimport.Utils;

public class DelayedBufferTest {
    @Test
    public void shouldHandleTheWholeWorkloadShebang() throws Throwable {
        int size = 1000;
        long bufferTime = 3L;
        VerifyingConsumer consumer = new VerifyingConsumer(1000);
        final Clock clock = Clock.SYSTEM_CLOCK;
        Supplier<Long> chunkThreshold = new Supplier<Long>(){

            public Long get() {
                return clock.currentTimeMillis();
            }
        };
        Predicate<Long> safeThreshold = new Predicate<Long>(){

            public boolean test(Long time) {
                return clock.currentTimeMillis() - 3L >= time;
            }
        };
        final DelayedBuffer buffer = new DelayedBuffer((Supplier)chunkThreshold, (Predicate)safeThreshold, 10, (Consumer)consumer);
        MaintenanceThread maintenance = new MaintenanceThread(buffer, 5L);
        Race adders = new Race();
        int numberOfAdders = 20;
        final byte[] offeredIds = new byte[1000];
        int i = 0;
        while (i < 20) {
            final int finalI = i++;
            adders.addContestant(new Runnable(){

                @Override
                public void run() {
                    for (int j = 0; j < 1000; ++j) {
                        if (j % 20 != finalI) continue;
                        buffer.offer((long)j);
                        offeredIds[j] = 1;
                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(ThreadLocalRandom.current().nextInt(2)));
                    }
                }
            });
        }
        adders.go();
        for (i = 0; i < 1000; ++i) {
            Assert.assertEquals((String)("ID " + i), (long)1L, (long)offeredIds[i]);
        }
        maintenance.halt();
        buffer.close();
        consumer.assertHaveOnlySeenRange(0L, 999L);
    }

    @Test
    public void shouldNotReleaseValuesUntilCrossedThreshold() throws Exception {
        VerifyingConsumer consumer = new VerifyingConsumer(30);
        final AtomicLong txOpened = new AtomicLong();
        final AtomicLong txClosed = new AtomicLong();
        Supplier<Long> chunkThreshold = new Supplier<Long>(){

            public Long get() {
                return txOpened.get();
            }
        };
        Predicate<Long> safeThreshold = new Predicate<Long>(){

            public boolean test(Long value) {
                return txClosed.get() >= value;
            }
        };
        DelayedBuffer buffer = new DelayedBuffer((Supplier)chunkThreshold, (Predicate)safeThreshold, 100, (Consumer)consumer);
        txOpened.incrementAndGet();
        buffer.offer(1L);
        txOpened.incrementAndGet();
        buffer.offer(4L);
        buffer.maintenance();
        Assert.assertEquals((long)0L, (long)consumer.chunksAccepted());
        buffer.offer(5L);
        txOpened.incrementAndGet();
        txOpened.incrementAndGet();
        buffer.offer(7L);
        buffer.maintenance();
        Assert.assertEquals((long)0L, (long)consumer.chunksAccepted());
        txOpened.incrementAndGet();
        buffer.offer(2L);
        buffer.offer(8L);
        buffer.maintenance();
        Assert.assertEquals((long)0L, (long)consumer.chunksAccepted());
        buffer.offer(6L);
        txOpened.incrementAndGet();
        buffer.offer(9L);
        buffer.offer(3L);
        txOpened.incrementAndGet();
        buffer.offer(11L);
        buffer.offer(12L);
        txClosed.set(4L);
        buffer.maintenance();
        consumer.assertHaveOnlySeen(1L, 4L, 5L, 7L);
        buffer.offer(10L);
        buffer.offer(13L);
        txClosed.set(6L);
        buffer.maintenance();
        consumer.assertHaveOnlySeen(1L, 2L, 4L, 5L, 7L, 8L);
        buffer.offer(14L);
        txClosed.set(7L);
        buffer.maintenance();
        consumer.assertHaveOnlySeen(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L);
    }

    @Test
    public void shouldClearCurrentChunk() throws Exception {
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        DelayedBuffer buffer = new DelayedBuffer(Suppliers.singleton((Object)0L), Predicates.alwaysTrue(), 10, consumer);
        buffer.offer(0L);
        buffer.offer(1L);
        buffer.offer(2L);
        buffer.clear();
        buffer.maintenance();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{consumer});
    }

    @Test
    public void shouldClearPreviousChunks() throws Exception {
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        final AtomicBoolean safeThreshold = new AtomicBoolean(false);
        DelayedBuffer buffer = new DelayedBuffer(Suppliers.singleton((Object)0L), (Predicate)new Predicate<Long>(){

            public boolean test(Long t) {
                return safeThreshold.get();
            }
        }, 10, consumer);
        buffer.offer(0L);
        buffer.maintenance();
        buffer.offer(1L);
        buffer.maintenance();
        buffer.offer(2L);
        buffer.maintenance();
        safeThreshold.set(true);
        buffer.clear();
        buffer.maintenance();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{consumer});
    }

    private static class VerifyingConsumer
    implements Consumer<long[]> {
        private final boolean[] seenIds;
        private int chunkCount;

        public VerifyingConsumer(int size) {
            this.seenIds = new boolean[size];
        }

        void assertHaveOnlySeenRange(long low, long high) {
            long[] values = new long[(int)(high - low + 1L)];
            long id = low;
            long i = 0L;
            while (id <= high) {
                values[(int)i] = id++;
                ++i;
            }
            this.assertHaveOnlySeen(values);
        }

        public void accept(long[] chunk) {
            ++this.chunkCount;
            for (long id : chunk) {
                Assert.assertFalse((boolean)this.seenIds[Utils.safeCastLongToInt((long)id)]);
                this.seenIds[Utils.safeCastLongToInt((long)id)] = true;
            }
        }

        void assertHaveOnlySeen(long ... values) {
            int vi = 0;
            for (int i = 0; i < this.seenIds.length && vi < values.length; ++i) {
                boolean expectedToBeSeen;
                boolean bl = expectedToBeSeen = values[vi] == (long)i;
                if (expectedToBeSeen && !this.seenIds[i]) {
                    Assert.fail((String)("Expected to have seen " + i + ", but hasn't"));
                } else if (!expectedToBeSeen && this.seenIds[i]) {
                    Assert.fail((String)("Expected to NOT have seen " + i + ", but have"));
                }
                if (!expectedToBeSeen) continue;
                ++vi;
            }
        }

        int chunksAccepted() {
            return this.chunkCount;
        }
    }

    private static class MaintenanceThread
    extends Thread {
        private final DelayedBuffer buffer;
        private final long nanoInterval;
        private volatile boolean end;

        MaintenanceThread(DelayedBuffer buffer, long nanoInterval) {
            this.buffer = buffer;
            this.nanoInterval = nanoInterval;
            this.start();
        }

        @Override
        public void run() {
            while (!this.end) {
                this.buffer.maintenance();
                LockSupport.parkNanos(this.nanoInterval);
            }
        }

        void halt() throws InterruptedException {
            this.end = true;
            while (this.isAlive()) {
                Thread.sleep(1L);
            }
        }
    }
}

