/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.DeadEndStep;
import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.PullingProducerStep;
import org.neo4j.unsafe.impl.batchimport.staging.SimpleStageControl;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.staging.StageExecution;
import org.neo4j.unsafe.impl.batchimport.staging.Step;
import org.neo4j.unsafe.impl.batchimport.stats.Key;
import org.neo4j.unsafe.impl.batchimport.stats.Keys;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.batchimport.stats.StepStats;

public class ForkedProcessorStepTest {
    @Test
    public void shouldProcessAllSingleThreaded() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        int processors = 10;
        int batches = 10;
        BatchProcessor step = new BatchProcessor(control, processors);
        TrackingStep downstream = new TrackingStep();
        step.setDownstream(downstream);
        step.processors(processors - step.processors(0));
        step.start(0);
        for (int i = 1; i <= batches; ++i) {
            step.receive(i, new Batch(processors));
        }
        step.endOfUpstream();
        while (!step.isCompleted()) {
            Thread.sleep(10L);
        }
        step.close();
        Assert.assertEquals((long)batches, (long)downstream.received.get());
    }

    @Test(timeout=10000L)
    public void shouldProcessAllBatchesOnSingleCoreSystems() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        int processors = 1;
        int batches = 10;
        BatchProcessor step = new BatchProcessor(control, processors);
        TrackingStep downstream = new TrackingStep();
        step.setDownstream(downstream);
        step.start(0);
        for (int i = 1; i <= batches; ++i) {
            step.receive(i, new Batch(processors));
        }
        step.endOfUpstream();
        while (!step.isCompleted()) {
            Thread.sleep(10L);
        }
        step.close();
        Assert.assertEquals((long)batches, (long)downstream.received.get());
    }

    @Test
    public void mustNotDetachProcessorsFromBatchChains() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        int processors = 1;
        int batches = 10;
        BatchProcessor step = new BatchProcessor(control, processors);
        TrackingStep downstream = new TrackingStep();
        step.setDownstream(downstream);
        int delta = processors - step.processors(0);
        step.processors(delta);
        step.start(0);
        for (int i = 1; i <= batches; ++i) {
            step.receive(i, new Batch(processors));
        }
        step.endOfUpstream();
        while (!step.isCompleted()) {
            Thread.sleep(10L);
        }
        step.close();
        Assert.assertEquals((long)batches, (long)downstream.received.get());
    }

    @Test
    public void shouldProcessAllMultiThreadedAndWithChangingProcessorCount() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        final BatchProcessor step = new BatchProcessor(control, availableProcessors);
        TrackingStep downstream = new TrackingStep();
        step.setDownstream(downstream);
        step.start(0);
        final AtomicLong nextTicket = new AtomicLong();
        Thread[] submitters = new Thread[3];
        final AtomicBoolean end = new AtomicBoolean();
        for (int i = 0; i < submitters.length; ++i) {
            submitters[i] = new Thread(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    ThreadLocalRandom random = ThreadLocalRandom.current();
                    while (!end.get()) {
                        AtomicLong atomicLong = nextTicket;
                        synchronized (atomicLong) {
                            if ((double)random.nextFloat() < 0.1) {
                                step.processors(random.nextInt(-2, 4));
                            }
                            long ticket = nextTicket.incrementAndGet();
                            Batch batch = new Batch(step.processors(0));
                            step.receive(ticket, batch);
                        }
                    }
                }
            };
            submitters[i].start();
        }
        while (downstream.received.get() < 200L) {
            Thread.sleep(10L);
        }
        end.set(true);
        for (Thread submitter : submitters) {
            submitter.join();
        }
        step.endOfUpstream();
        step.close();
    }

    @Test
    public void shouldKeepForkedOrderIntactWhenChangingProcessorCount() throws Exception {
        int length = 100;
        final AtomicIntegerArray reference = new AtomicIntegerArray(length);
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        ForkedProcessorStep<int[]> step = new ForkedProcessorStep<int[]>(control, "Processor", ForkedProcessorStepTest.config(availableProcessors), new StatsProvider[0]){

            protected void forkedProcess(int id, int processors, int[] batch) throws InterruptedException {
                int ticket = batch[0];
                Thread.sleep(ThreadLocalRandom.current().nextInt(10));
                for (int i = 1; i < batch.length; ++i) {
                    if (batch[i] % processors != id) continue;
                    boolean compareAndSet = reference.compareAndSet(batch[i], ticket, ticket + 1);
                    Assert.assertTrue((String)("I am " + id + ". Was expecting " + ticket + " for " + batch[i] + " but was " + reference.get(batch[i])), (boolean)compareAndSet);
                }
            }
        };
        DeadEndStep downstream = new DeadEndStep(control);
        step.setDownstream((Step)downstream);
        step.start(0);
        downstream.start(0);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int ticket = 0; ticket < 200; ++ticket) {
            if ((double)random.nextFloat() < 0.1) {
                int n = step.processors(random.nextInt(-2, 4));
            }
            int[] batch = new int[length];
            batch[0] = ticket;
            for (int j = 1; j < batch.length; ++j) {
                batch[j] = j - 1;
            }
            step.receive((long)ticket, (Object)batch);
        }
        step.endOfUpstream();
        while (!step.isCompleted()) {
            Thread.sleep(10L);
        }
        step.close();
    }

    @Test
    public void shouldPanicOnFailure() throws Exception {
        SimpleStageControl control = new SimpleStageControl();
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        final RuntimeException testPanic = new RuntimeException();
        ForkedProcessorStep<Void> step = new ForkedProcessorStep<Void>((StageControl)control, "Processor", ForkedProcessorStepTest.config(availableProcessors), new StatsProvider[0]){

            protected void forkedProcess(int id, int processors, Void batch) throws Throwable {
                throw testPanic;
            }
        };
        step.start(0);
        step.receive(1L, null);
        control.steps(new Step[]{step});
        while (!step.isCompleted()) {
            Thread.sleep(10L);
        }
        try {
            control.assertHealthy();
        }
        catch (Exception e) {
            Assert.assertSame((Object)testPanic, (Object)e);
        }
    }

    @Test(timeout=60000L)
    public void shouldBeAbleToProgressUnderStressfulProcessorChangesWhenOrdered() throws Exception {
        this.shouldBeAbleToProgressUnderStressfulProcessorChanges(1);
    }

    @Test(timeout=60000L)
    public void shouldBeAbleToProgressUnderStressfulProcessorChangesWhenUnordered() throws Exception {
        this.shouldBeAbleToProgressUnderStressfulProcessorChanges(0);
    }

    private void shouldBeAbleToProgressUnderStressfulProcessorChanges(int orderingGuarantees) throws Exception {
        int batches = 100;
        final int processors = Runtime.getRuntime().availableProcessors() * 10;
        Configuration.Overridden config = new Configuration.Overridden(Configuration.DEFAULT){

            public int maxNumberOfProcessors() {
                return processors;
            }
        };
        StressStage stage = new StressStage((Configuration)config, orderingGuarantees, batches);
        StageExecution execution = stage.execute();
        List steps = Iterables.asList((Iterable)execution.steps());
        ((Step)steps.get(1)).processors(processors / 3);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        while (execution.stillExecuting()) {
            ((Step)steps.get(2)).processors(random.nextInt(-2, 5));
            Thread.sleep(1L);
        }
        execution.assertHealthy();
        Assert.assertEquals((long)batches, (long)((Step)steps.get(steps.size() - 1)).stats().stat((Key)Keys.done_batches).asLong());
    }

    private static Configuration config(final int processors) {
        return new Configuration(){

            public int maxNumberOfProcessors() {
                return processors;
            }
        };
    }

    private static class TrackingStep
    implements Step<Batch> {
        private final AtomicLong received = new AtomicLong();

        private TrackingStep() {
        }

        public void receivePanic(Throwable cause) {
        }

        public void start(int orderingGuarantees) {
        }

        public String name() {
            return "END";
        }

        public long receive(long ticket, Batch batch) {
            int count = 0;
            for (int i = 0; i < batch.processed.length; ++i) {
                if (!batch.processed[i]) continue;
                ++count;
            }
            Assert.assertEquals((long)batch.processed.length, (long)count);
            if (!this.received.compareAndSet(ticket - 1L, ticket)) {
                Assert.fail((String)("Hmm " + ticket + " " + this.received.get()));
            }
            return 0L;
        }

        public StepStats stats() {
            return null;
        }

        public void endOfUpstream() {
        }

        public boolean isCompleted() {
            return false;
        }

        public void setDownstream(Step<?> downstreamStep) {
        }

        public void close() throws Exception {
        }
    }

    private static class Batch {
        private final boolean[] processed;

        Batch(int processors) {
            this.processed = new boolean[processors];
        }

        void processedBy(int id) {
            Assert.assertFalse((boolean)this.processed[id]);
            this.processed[id] = true;
        }
    }

    private static class BatchProcessor
    extends ForkedProcessorStep<Batch> {
        protected BatchProcessor(StageControl control, int processors) {
            super(control, "PROCESSOR", ForkedProcessorStepTest.config(processors), new StatsProvider[0]);
        }

        protected void forkedProcess(int id, int processors, Batch batch) {
            batch.processedBy(id);
        }
    }

    private static class StressStage
    extends Stage {
        StressStage(Configuration config, int orderingGuarantees, final int batches) {
            super("Stress", config, orderingGuarantees);
            this.add((Step)new PullingProducerStep(this.control(), config){

                protected long position() {
                    return 0L;
                }

                protected Object nextBatchOrNull(long ticket, int batchSize) {
                    return ticket < (long)batches ? Long.valueOf(ticket) : null;
                }
            });
            this.add((Step)new ProcessorStep<Long>(this.control(), "Yeah", config, 3, new StatsProvider[0]){

                protected void process(Long batch, BatchSender sender) throws Throwable {
                    Thread.sleep(0L, ThreadLocalRandom.current().nextInt(100000));
                    sender.send((Object)batch);
                }
            });
            this.add((Step)new ForkedProcessorStep<Long>(this.control(), "Subject", config, new StatsProvider[0]){

                protected void forkedProcess(int id, int processors, Long batch) throws Throwable {
                    Thread.sleep(0L, ThreadLocalRandom.current().nextInt(100000));
                }
            });
            this.add((Step)new DeadEndStep(this.control()));
        }
    }
}

