/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.query.operator;

import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.junit.Assert;

public class OperatorTestHelper {
    private Supplier<TestReceiver> receiverSupply;
    private Consumer<TestReceiver> finalValidation;

    public OperatorTestHelper expectRowsAndColumns(final RowsAndColumnsHelper ... helpers) {
        return this.withPushFn(() -> new JustPushMe(){
            int index = 0;

            @Override
            public Operator.Signal push(RowsAndColumns rac) {
                helpers[this.index++].validate(rac);
                return Operator.Signal.GO;
            }
        }).withFinalValidation(testReceiver -> Assert.assertEquals((long)helpers.length, (long)testReceiver.getNumPushed()));
    }

    public OperatorTestHelper expectAndStopAfter(final RowsAndColumnsHelper ... helpers) {
        return this.withPushFn(() -> new JustPushMe(){
            int index = 0;

            @Override
            public Operator.Signal push(RowsAndColumns rac) {
                helpers[this.index++].validate(rac);
                return this.index < helpers.length ? Operator.Signal.GO : Operator.Signal.STOP;
            }
        }).withFinalValidation(testReceiver -> Assert.assertEquals((long)helpers.length, (long)testReceiver.getNumPushed()));
    }

    public OperatorTestHelper withReceiver(Supplier<TestReceiver> receiver) {
        if (this.receiverSupply != null) {
            throw new ISE("Receiver[%s] already set, cannot set it again[%s].", new Object[]{this.receiverSupply, receiver});
        }
        this.receiverSupply = receiver;
        return this;
    }

    public OperatorTestHelper withFinalValidation(Consumer<TestReceiver> validator) {
        if (this.finalValidation == null) {
            this.finalValidation = validator;
        } else {
            Consumer<TestReceiver> subValidator = this.finalValidation;
            this.finalValidation = receiver -> {
                subValidator.accept((TestReceiver)receiver);
                validator.accept((TestReceiver)receiver);
            };
        }
        return this;
    }

    public OperatorTestHelper withPushFn(Supplier<JustPushMe> fnSupplier) {
        return this.withReceiver(() -> new TestReceiver((JustPushMe)fnSupplier.get()));
    }

    public void runToCompletion(Operator op) {
        TestReceiver receiver = this.receiverSupply.get();
        Operator.go((Operator)op, (Operator.Receiver)receiver);
        Assert.assertTrue((boolean)receiver.isCompleted());
        if (this.finalValidation != null) {
            this.finalValidation.accept(receiver);
        }
        for (int i = 1; i < receiver.getNumPushed(); ++i) {
            long expectedNumPauses = receiver.getNumPushed() / i;
            if (receiver.getNumPushed() % i > 0) {
                ++expectedNumPauses;
            }
            this.runWhilePausing(op, expectedNumPauses, i);
        }
    }

    private void runWhilePausing(Operator op, long expectedNumPauses, int pauseAfter) {
        TestReceiver pausingReceiver = this.receiverSupply.get();
        pausingReceiver.setPauseAfter(pauseAfter);
        int numPauses = 0;
        Closeable continuation = null;
        do {
            continuation = op.goOrContinue(continuation, (Operator.Receiver)pausingReceiver);
            ++numPauses;
        } while (continuation != null);
        String msg = StringUtils.format((String)"pauseAfter[%,d]", (Object[])new Object[]{pauseAfter});
        Assert.assertTrue((String)msg, (boolean)pausingReceiver.isCompleted());
        Assert.assertEquals((String)msg, (long)expectedNumPauses, (long)numPauses);
        if (this.finalValidation != null) {
            this.finalValidation.accept(pausingReceiver);
        }
    }

    public static class TestReceiver
    implements Operator.Receiver {
        private final JustPushMe pushFn;
        private AtomicInteger numPushed = new AtomicInteger();
        private AtomicBoolean completed = new AtomicBoolean(false);
        private long pauseAfter = -1L;

        public TestReceiver(JustPushMe pushFn) {
            this.pushFn = pushFn;
        }

        public Operator.Signal push(RowsAndColumns rac) {
            this.numPushed.incrementAndGet();
            Operator.Signal push = this.pushFn.push(rac);
            if (push == Operator.Signal.GO && this.pauseAfter != -1L && (long)this.numPushed.get() % this.pauseAfter == 0L) {
                return Operator.Signal.PAUSE;
            }
            return push;
        }

        public boolean isCompleted() {
            return this.completed.get();
        }

        public void completed() {
            if (!this.completed.compareAndSet(false, true)) {
                throw new ISE("complete called more than once!?  Why.", new Object[0]);
            }
        }

        public int getNumPushed() {
            return this.numPushed.get();
        }

        public void setPauseAfter(long pauseAfter) {
            this.pauseAfter = pauseAfter;
        }
    }

    public static interface JustPushMe {
        public Operator.Signal push(RowsAndColumns var1);
    }
}

