/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.helpers.collection.IteratorWrapper;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.test.DoubleLatch;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.rule.concurrent.OtherThreadRule;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException;
import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing;

public class TicketedProcessingTest {
    @Rule
    public final OtherThreadRule<Void> t2 = new OtherThreadRule();

    @Test
    public void shouldReturnTicketsInOrder() throws Exception {
        Future<Void> assertions;
        final int items = 1000;
        ParkStrategy.Park park = new ParkStrategy.Park(2L, TimeUnit.MILLISECONDS);
        BiFunction<Integer, Void, Integer> processor = (arg_0, arg_1) -> TicketedProcessingTest.lambda$shouldReturnTicketsInOrder$0((ParkStrategy)park, arg_0, arg_1);
        int processorCount = Runtime.getRuntime().availableProcessors();
        try (final TicketedProcessing processing = new TicketedProcessing("Doubler", processorCount, processor, () -> null);){
            processing.processors(processorCount - processing.processors(0));
            assertions = this.t2.execute(new OtherThreadExecutor.WorkerCommand<Void, Void>(){

                @Override
                public Void doWork(Void state) throws Exception {
                    for (int i = 0; i < items; ++i) {
                        Integer next = (Integer)processing.next();
                        Assert.assertNotNull((Object)next);
                        Assert.assertEquals((long)(i * 2), (long)next.intValue());
                    }
                    Assert.assertNull((Object)processing.next());
                    return null;
                }
            });
            for (int i = 0; i < items; ++i) {
                processing.submit((Object)i);
            }
        }
        assertions.get();
    }

    @Test
    public void shouldNotBeAbleToSubmitTooFarAhead() throws Exception {
        try (final TicketedProcessing processing = new TicketedProcessing("Parser", 2, (job, state) -> {
            DoubleLatch.awaitLatch(job.latch);
            return Integer.parseInt(job.string);
        }, () -> null);){
            processing.processors(1);
            StringJob firstJob = new StringJob("1");
            processing.submit((Object)firstJob);
            StringJob secondJob = new StringJob("2");
            processing.submit((Object)secondJob);
            StringJob thirdJob = new StringJob("3");
            thirdJob.latch.countDown();
            processing.submit((Object)thirdJob);
            StringJob fourthJob = new StringJob("4");
            fourthJob.latch.countDown();
            processing.submit((Object)fourthJob);
            final StringJob fifthJob = new StringJob("5");
            fifthJob.latch.countDown();
            Future<Void> fifthSubmit = this.t2.execute(new OtherThreadExecutor.WorkerCommand<Void, Void>(){

                @Override
                public Void doWork(Void state) throws Exception {
                    processing.submit((Object)fifthJob);
                    return null;
                }
            });
            this.t2.get().waitUntilThreadState(Thread.State.TIMED_WAITING, Thread.State.WAITING);
            firstJob.latch.countDown();
            Assert.assertEquals((long)1L, (long)((Integer)processing.next()).intValue());
            fifthSubmit.get();
            secondJob.latch.countDown();
            Assert.assertEquals((long)2L, (long)((Integer)processing.next()).intValue());
            Assert.assertEquals((long)3L, (long)((Integer)processing.next()).intValue());
        }
    }

    @Test
    public void shouldNoticeSlurpPanic() throws Exception {
        IllegalStateException failure = new IllegalStateException("Consistently failing");
        try (TicketedProcessing processing = new TicketedProcessing("Parser", 2, (job, state) -> Integer.parseInt(job.string), () -> null);){
            processing.processors(1);
            Future slurp = processing.slurp(TicketedProcessingTest.failingIterator(Iterators.iterator((Object[])new Supplier[]{() -> new StringJob("1"), () -> {
                throw failure;
            }})), true);
            try {
                while (processing.next() != null) {
                }
                Assert.fail((String)"Should have noticed the slurp failure");
            }
            catch (TaskExecutionPanicException e) {
                Assert.assertSame((Object)failure, (Object)e.getCause());
            }
            try {
                slurp.get();
                Assert.fail((String)"Should have noticed the slurp failure");
            }
            catch (ExecutionException e) {
                Assert.assertSame((Object)failure, (Object)e.getCause());
            }
        }
    }

    private static Iterator<StringJob> failingIterator(Iterator<Supplier<StringJob>> suppliers) {
        return new IteratorWrapper<StringJob, Supplier<StringJob>>(suppliers){

            protected StringJob underlyingObjectToObject(Supplier<StringJob> object) {
                return object.get();
            }
        };
    }

    private static /* synthetic */ Integer lambda$shouldReturnTicketsInOrder$0(ParkStrategy park, Integer from, Void ignore) {
        if (ThreadLocalRandom.current().nextFloat() < 0.01f) {
            park.park(Thread.currentThread());
        }
        return from * 2;
    }

    private static class StringJob {
        final String string;
        final CountDownLatch latch = new CountDownLatch(1);

        StringJob(String string) {
            this.string = string;
        }
    }
}

