/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.rule.concurrent.OtherThreadRule;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

public class ProcessorStepTest {
    @Rule
    public final OtherThreadRule<Void> t2 = new OtherThreadRule();

    @Test
    public void shouldUpholdProcessOrderingGuarantee() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        MyProcessorStep step = new MyProcessorStep(control, 0);
        step.start(1);
        step.processors(4);
        int batches = 10;
        for (int i = 0; i < batches; ++i) {
            step.receive(i, i);
        }
        step.endOfUpstream();
        while (!step.isCompleted()) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)batches, (long)step.nextExpected.get());
        step.close();
    }

    @Test
    public void shouldHaveTaskQueueSizeEqualToMaxNumberOfProcessors() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        CountDownLatch latch = new CountDownLatch(1);
        int processors = 2;
        final int maxProcessors = 5;
        Configuration configuration = new Configuration(){

            public int maxNumberOfProcessors() {
                return maxProcessors;
            }
        };
        BlockingProcessorStep step = new BlockingProcessorStep(control, configuration, 2, latch);
        step.start(1);
        step.processors(1);
        for (int i = 0; i < 2 + maxProcessors; ++i) {
            step.receive(i, null);
        }
        Future receiveFuture = this.t2.execute(this.receive(2, step));
        this.t2.get().waitUntilThreadState(new Thread.State[]{Thread.State.TIMED_WAITING});
        latch.countDown();
        receiveFuture.get();
    }

    @Test
    public void shouldRecycleDoneBatches() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        MyProcessorStep step = new MyProcessorStep(control, 0);
        step.start(1);
        int batches = 10;
        for (int i = 0; i < batches; ++i) {
            step.receive(i, i);
        }
        step.endOfUpstream();
        while (!step.isCompleted()) {
            Thread.sleep(10L);
        }
        ((StageControl)Mockito.verify((Object)control, (VerificationMode)Mockito.times((int)batches))).recycle(Matchers.any());
        step.close();
    }

    private OtherThreadExecutor.WorkerCommand<Void, Void> receive(int processors, ProcessorStep<Void> step) {
        return state -> {
            step.receive((long)processors, null);
            return null;
        };
    }

    private static class MyProcessorStep
    extends ProcessorStep<Integer> {
        private final AtomicInteger nextExpected = new AtomicInteger();

        private MyProcessorStep(StageControl control, int maxProcessors) {
            super(control, "test", Configuration.DEFAULT, maxProcessors, new StatsProvider[0]);
        }

        protected void process(Integer batch, BatchSender sender) {
            this.nextExpected.incrementAndGet();
        }
    }

    private static class BlockingProcessorStep
    extends ProcessorStep<Void> {
        private final CountDownLatch latch;

        BlockingProcessorStep(StageControl control, Configuration configuration, int maxProcessors, CountDownLatch latch) {
            super(control, "test", configuration, maxProcessors, new StatsProvider[0]);
            this.latch = latch;
        }

        protected void process(Void batch, BatchSender sender) throws Throwable {
            this.latch.await();
        }
    }
}

