/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisor;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProducerStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.staging.StageExecution;
import org.neo4j.unsafe.impl.batchimport.staging.Step;
import org.neo4j.unsafe.impl.batchimport.stats.Key;
import org.neo4j.unsafe.impl.batchimport.stats.Keys;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

public class StageTest {
    @Test
    public void shouldReceiveBatchesInOrder() throws Exception {
        Configuration.Overridden config = new Configuration.Overridden(Configuration.DEFAULT){

            public int batchSize() {
                return 10;
            }
        };
        Stage stage = new Stage("Test stage", (Configuration)config, 1);
        long batches = 1000L;
        final long items = batches * (long)config.batchSize();
        stage.add((Step)new ProducerStep(stage.control(), "Producer", (Configuration)config){
            private final Object theObject;
            private long i;
            {
                super(x0, x1, x2);
                this.theObject = new Object();
            }

            protected Object nextBatchOrNull(long ticket, int batchSize) {
                if (this.i >= items) {
                    return null;
                }
                Object[] batch = new Object[batchSize];
                Arrays.fill(batch, this.theObject);
                this.i += (long)batchSize;
                return batch;
            }
        });
        for (int i = 0; i < 3; ++i) {
            stage.add((Step)new ReceiveOrderAssertingStep(stage.control(), "Step" + i, (Configuration)config, i, false));
        }
        stage.add((Step)new ReceiveOrderAssertingStep(stage.control(), "Final step", (Configuration)config, 0L, true));
        StageExecution execution = stage.execute();
        for (Step step : execution.steps()) {
            step.processors(1);
        }
        new ExecutionSupervisor(ExecutionMonitors.invisible()).supervise(execution);
        for (Step step : execution.steps()) {
            Assert.assertEquals((String)("For " + step), (long)batches, (long)step.stats().stat((Key)Keys.done_batches).asLong());
        }
        stage.close();
    }

    private static class ReceiveOrderAssertingStep
    extends ProcessorStep<Object> {
        private final AtomicLong lastTicket = new AtomicLong();
        private final long processingTime;
        private final boolean endOfLine;

        ReceiveOrderAssertingStep(StageControl control, String name, Configuration config, long processingTime, boolean endOfLine) {
            super(control, name, config, 1, new StatsProvider[0]);
            this.processingTime = processingTime;
            this.endOfLine = endOfLine;
        }

        public long receive(long ticket, Object batch) {
            Assert.assertEquals((String)("For " + batch + " in " + this.name()), (long)this.lastTicket.getAndIncrement(), (long)ticket);
            return super.receive(ticket, batch);
        }

        protected void process(Object batch, BatchSender sender) {
            try {
                Thread.sleep(this.processingTime);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if (!this.endOfLine) {
                sender.send(batch);
            }
        }
    }
}

