/*
 * Decompiled with CFR 0.152.
 */
package org.reactivestreams.tck;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.support.Optional;
import org.reactivestreams.tck.support.SubscriberBufferOverflowException;
import org.testng.Assert;

public class TestEnvironment {
    public static final int TEST_BUFFER_SIZE = 16;
    private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS";
    private static final long DEFAULT_TIMEOUT_MILLIS = 100L;
    private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS";
    private final long defaultTimeoutMillis;
    private final long defaultNoSignalsTimeoutMillis;
    private final boolean printlnDebug;
    private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList();

    public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
        this.defaultTimeoutMillis = defaultTimeoutMillis;
        this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis;
        this.printlnDebug = printlnDebug;
    }

    public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) {
        this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, false);
    }

    public TestEnvironment(long defaultTimeoutMillis) {
        this(defaultTimeoutMillis, defaultTimeoutMillis, false);
    }

    public TestEnvironment(boolean printlnDebug) {
        this(TestEnvironment.envDefaultTimeoutMillis(), TestEnvironment.envDefaultNoSignalsTimeoutMillis(), printlnDebug);
    }

    public TestEnvironment() {
        this(TestEnvironment.envDefaultTimeoutMillis(), TestEnvironment.envDefaultNoSignalsTimeoutMillis());
    }

    public long defaultTimeoutMillis() {
        return this.defaultTimeoutMillis;
    }

    public long defaultNoSignalsTimeoutMillis() {
        return this.defaultNoSignalsTimeoutMillis;
    }

    public static long envDefaultTimeoutMillis() {
        String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV);
        if (envMillis == null) {
            return 100L;
        }
        try {
            return Long.parseLong(envMillis);
        }
        catch (NumberFormatException ex) {
            throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex);
        }
    }

    public static long envDefaultNoSignalsTimeoutMillis() {
        String envMillis = System.getenv(DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV);
        if (envMillis == null) {
            return TestEnvironment.envDefaultTimeoutMillis();
        }
        try {
            return Long.parseLong(envMillis);
        }
        catch (NumberFormatException ex) {
            throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV, envMillis), ex);
        }
    }

    public void flop(String msg) {
        try {
            Assert.fail((String)msg);
        }
        catch (Throwable t) {
            this.asyncErrors.add(t);
        }
    }

    public void flop(Throwable thr, String msg) {
        try {
            Assert.fail((String)msg, (Throwable)thr);
        }
        catch (Throwable t) {
            this.asyncErrors.add(thr);
        }
    }

    public void flop(Throwable thr) {
        try {
            Assert.fail((String)thr.getMessage(), (Throwable)thr);
        }
        catch (Throwable t) {
            this.asyncErrors.add(thr);
        }
    }

    public <T> T flopAndFail(String msg) {
        try {
            Assert.fail((String)msg);
        }
        catch (Throwable t) {
            this.asyncErrors.add(t);
            Assert.fail((String)msg, (Throwable)t);
        }
        return null;
    }

    public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
        this.subscribe(pub, sub, this.defaultTimeoutMillis);
    }

    public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
        pub.subscribe(sub);
        sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
        this.verifyNoAsyncErrorsNoDelay();
    }

    public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
        BlackholeSubscriberWithSubscriptionSupport sub = new BlackholeSubscriberWithSubscriptionSupport(this);
        this.subscribe(pub, sub, this.defaultTimeoutMillis());
        return sub;
    }

    public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException {
        return this.newManualSubscriber(pub, this.defaultTimeoutMillis());
    }

    public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException {
        ManualSubscriberWithSubscriptionSupport sub = new ManualSubscriberWithSubscriptionSupport(this);
        this.subscribe(pub, sub, timeoutMillis);
        return sub;
    }

    public void clearAsyncErrors() {
        this.asyncErrors.clear();
    }

    public Throwable dropAsyncError() {
        try {
            return this.asyncErrors.remove(0);
        }
        catch (IndexOutOfBoundsException ex) {
            return null;
        }
    }

    public void verifyNoAsyncErrors() {
        this.verifyNoAsyncErrors(this.defaultNoSignalsTimeoutMillis());
    }

    public void verifyNoAsyncErrors(long delay) {
        try {
            this.verifyNoAsyncErrorsNoDelay();
            Thread.sleep(delay);
            this.verifyNoAsyncErrorsNoDelay();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void verifyNoAsyncErrorsNoDelay() {
        for (Throwable e : this.asyncErrors) {
            if (e instanceof AssertionError) {
                throw (AssertionError)((Object)e);
            }
            Assert.fail((String)String.format("Async error during test execution: %s", e.getMessage()), (Throwable)e);
        }
    }

    public void debug(String msg) {
        if (this.printlnDebug) {
            System.out.printf("[TCK-DEBUG] %s%n", msg);
        }
    }

    public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) {
        Throwable thr = new Throwable();
        for (StackTraceElement stackElement : thr.getStackTrace()) {
            if (!stackElement.getMethodName().equals(method)) continue;
            return Optional.of(stackElement);
        }
        return Optional.empty();
    }

    public static class Receptacle<T> {
        final int QUEUE_SIZE = 32;
        private final TestEnvironment env;
        private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue(32);
        private final Latch completedLatch;

        Receptacle(TestEnvironment env) {
            this.env = env;
            this.completedLatch = new Latch(env);
        }

        public void add(T value) {
            this.completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value));
            this.abq.add(Optional.of(value));
        }

        public void complete() {
            this.completedLatch.assertOpen("Unexpected additional complete signal received!");
            this.completedLatch.close();
            this.abq.add(Optional.empty());
        }

        public T next(long timeoutMillis, String errorMsg) throws InterruptedException {
            Optional<T> value = this.abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
            if (value == null) {
                return this.env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis));
            }
            if (value.isDefined()) {
                return value.get();
            }
            return this.env.flopAndFail("Expected element but got end-of-stream");
        }

        public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
            Optional<T> value = this.abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
            if (value == null) {
                this.env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
                return Optional.empty();
            }
            return value;
        }

        public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
            LinkedList<T> result = new LinkedList<T>();
            long deadline = System.currentTimeMillis() + timeoutMillis;
            for (long remaining = elements; remaining > 0L; --remaining) {
                long remainingMillis = deadline - System.currentTimeMillis();
                result.add(this.next(remainingMillis, errorMsg));
            }
            return result;
        }

        public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
            Optional<T> value = this.abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
            if (value == null) {
                this.env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
            } else if (value.isDefined()) {
                this.env.flop(String.format("Expected end-of-stream but got element [%s]", value.get()));
            }
        }

        public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception {
            Thread.sleep(timeoutMillis);
            if (this.env.asyncErrors.isEmpty()) {
                return (E)((Throwable)this.env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)));
            }
            Throwable thrown = (Throwable)this.env.asyncErrors.remove(0);
            if (clazz.isInstance(thrown)) {
                return (E)thrown;
            }
            return (E)((Throwable)this.env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s", errorMsg, timeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName())));
        }

        public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
            Thread.sleep(withinMillis);
            Optional<T> value = this.abq.poll();
            if (value != null) {
                if (value.isDefined()) {
                    this.env.flop(String.format("%s [%s]", errorMsgPrefix, value.get()));
                } else {
                    this.env.flop("Expected no element but got end-of-stream");
                }
            }
        }
    }

    public static class Promise<T> {
        private final TestEnvironment env;
        private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue(1);
        private volatile T _value = null;

        public static <T> Promise<T> completed(TestEnvironment env, T value) {
            Promise<T> promise = new Promise<T>(env);
            promise.completeImmediatly(value);
            return promise;
        }

        public Promise(TestEnvironment env) {
            this.env = env;
        }

        public T value() {
            if (this.isCompleted()) {
                return this._value;
            }
            this.env.flop("Cannot access promise value before completion");
            return null;
        }

        public boolean isCompleted() {
            return this._value != null;
        }

        public void complete(T value) {
            this.abq.add(value);
        }

        public void completeImmediatly(T value) {
            this.complete(value);
            this._value = value;
        }

        public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
            if (!this.isCompleted()) {
                T val = this.abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
                if (val == null) {
                    this.env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
                } else {
                    this._value = val;
                }
            }
        }
    }

    public static class Latch {
        private final TestEnvironment env;
        private volatile CountDownLatch countDownLatch = new CountDownLatch(1);

        public Latch(TestEnvironment env) {
            this.env = env;
        }

        public void reOpen() {
            this.countDownLatch = new CountDownLatch(1);
        }

        public boolean isClosed() {
            return this.countDownLatch.getCount() == 0L;
        }

        public void close() {
            this.countDownLatch.countDown();
        }

        public void assertClosed(String openErrorMsg) {
            if (!this.isClosed()) {
                this.env.flop(new ExpectedClosedLatchException(openErrorMsg));
            }
        }

        public void assertOpen(String closedErrorMsg) {
            if (this.isClosed()) {
                this.env.flop(new ExpectedOpenLatchException(closedErrorMsg));
            }
        }

        public void expectClose(String notClosedErrorMsg) throws InterruptedException {
            this.expectClose(this.env.defaultTimeoutMillis(), notClosedErrorMsg);
        }

        public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException {
            this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
            if (this.countDownLatch.getCount() > 0L) {
                this.env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis));
            }
        }

        static final class ExpectedClosedLatchException
        extends RuntimeException {
            public ExpectedClosedLatchException(String message) {
                super(message);
            }
        }

        static final class ExpectedOpenLatchException
        extends RuntimeException {
            public ExpectedOpenLatchException(String message) {
                super(message);
            }
        }
    }

    public static class ManualPublisher<T>
    implements Publisher<T> {
        protected final TestEnvironment env;
        protected long pendingDemand = 0L;
        protected Promise<Subscriber<? super T>> subscriber;
        protected final Receptacle<Long> requests;
        protected final Latch cancelled;

        public ManualPublisher(TestEnvironment env) {
            this.env = env;
            this.requests = new Receptacle(env);
            this.cancelled = new Latch(env);
            this.subscriber = new Promise(this.env);
        }

        public void subscribe(Subscriber<? super T> s) {
            if (!this.subscriber.isCompleted()) {
                this.subscriber.completeImmediatly(s);
                Subscription subs = new Subscription(){

                    public void request(long elements) {
                        ManualPublisher.this.requests.add(elements);
                    }

                    public void cancel() {
                        ManualPublisher.this.cancelled.close();
                    }
                };
                s.onSubscribe(subs);
            } else {
                this.env.flop("TestPublisher doesn't support more than one Subscriber");
            }
        }

        public void sendNext(T element) {
            if (this.subscriber.isCompleted()) {
                this.subscriber.value().onNext(element);
            } else {
                this.env.flop("Cannot sendNext before having a Subscriber");
            }
        }

        public void sendCompletion() {
            if (this.subscriber.isCompleted()) {
                this.subscriber.value().onComplete();
            } else {
                this.env.flop("Cannot sendCompletion before having a Subscriber");
            }
        }

        public void sendError(Throwable cause) {
            if (this.subscriber.isCompleted()) {
                this.subscriber.value().onError(cause);
            } else {
                this.env.flop("Cannot sendError before having a Subscriber");
            }
        }

        public long expectRequest() throws InterruptedException {
            return this.expectRequest(this.env.defaultTimeoutMillis());
        }

        public long expectRequest(long timeoutMillis) throws InterruptedException {
            long requested = this.requests.next(timeoutMillis, "Did not receive expected `request` call");
            if (requested <= 0L) {
                return (Long)this.env.flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested));
            }
            this.pendingDemand += requested;
            return requested;
        }

        public void expectExactRequest(long expected) throws InterruptedException {
            this.expectExactRequest(expected, this.env.defaultTimeoutMillis());
        }

        public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException {
            long requested = this.expectRequest(timeoutMillis);
            if (requested != expected) {
                this.env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected));
            }
            this.pendingDemand += requested;
        }

        public void expectNoRequest() throws InterruptedException {
            this.expectNoRequest(this.env.defaultTimeoutMillis());
        }

        public void expectNoRequest(long timeoutMillis) throws InterruptedException {
            this.requests.expectNone(timeoutMillis, "Received an unexpected call to: request: ");
        }

        public void expectCancelling() throws InterruptedException {
            this.expectCancelling(this.env.defaultTimeoutMillis());
        }

        public void expectCancelling(long timeoutMillis) throws InterruptedException {
            this.cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription");
        }

        public boolean isCancelled() throws InterruptedException {
            return this.cancelled.isClosed();
        }
    }

    public static class TestSubscriber<T>
    implements Subscriber<T> {
        final Promise<Subscription> subscription;
        protected final TestEnvironment env;

        public TestSubscriber(TestEnvironment env) {
            this.env = env;
            this.subscription = new Promise(env);
        }

        public void onError(Throwable cause) {
            this.env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause));
        }

        public void onComplete() {
            this.env.flop("Unexpected Subscriber::onComplete()");
        }

        public void onNext(T element) {
            this.env.flop(String.format("Unexpected Subscriber::onNext(%s)", element));
        }

        public void onSubscribe(Subscription subscription) {
            this.env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription));
        }

        public void cancel() {
            if (this.subscription.isCompleted()) {
                this.subscription.value().cancel();
            } else {
                this.env.flop("Cannot cancel a subscription before having received it");
            }
        }
    }

    public static class BlackholeSubscriberWithSubscriptionSupport<T>
    extends ManualSubscriberWithSubscriptionSupport<T> {
        public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) {
            super(env);
        }

        @Override
        public void onNext(T element) {
            this.env.debug(String.format("%s::onNext(%s)", this, element));
            if (!this.subscription.isCompleted()) {
                this.env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
            }
        }

        @Override
        public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
            throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
        }

        @Override
        public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
            throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
        }
    }

    public static class ManualSubscriberWithSubscriptionSupport<T>
    extends ManualSubscriber<T> {
        public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) {
            super(env);
        }

        @Override
        public void onNext(T element) {
            this.env.debug(String.format("%s::onNext(%s)", this, element));
            if (this.subscription.isCompleted()) {
                super.onNext(element);
            } else {
                this.env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
            }
        }

        @Override
        public void onComplete() {
            this.env.debug(this + "::onComplete()");
            if (this.subscription.isCompleted()) {
                super.onComplete();
            } else {
                this.env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe");
            }
        }

        @Override
        public void onSubscribe(Subscription s) {
            this.env.debug(String.format("%s::onSubscribe(%s)", this, s));
            if (!this.subscription.isCompleted()) {
                this.subscription.complete(s);
            } else {
                this.env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber");
            }
        }

        @Override
        public void onError(Throwable cause) {
            this.env.debug(String.format("%s::onError(%s)", this, cause));
            if (this.subscription.isCompleted()) {
                super.onError(cause);
            } else {
                this.env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause));
            }
        }
    }

    public static class ManualSubscriber<T>
    extends TestSubscriber<T> {
        Receptacle<T> received;

        public ManualSubscriber(TestEnvironment env) {
            super(env);
            this.received = new Receptacle(this.env);
        }

        @Override
        public void onNext(T element) {
            try {
                this.received.add(element);
            }
            catch (IllegalStateException ex) {
                throw new SubscriberBufferOverflowException(String.format("Received more than bufferSize (%d) onNext signals. The Publisher probably emited more signals than expected!", this.received.QUEUE_SIZE), ex);
            }
        }

        @Override
        public void onComplete() {
            this.received.complete();
        }

        public void request(long elements) {
            ((Subscription)this.subscription.value()).request(elements);
        }

        public T requestNextElement() throws InterruptedException {
            return this.requestNextElement(this.env.defaultTimeoutMillis());
        }

        public T requestNextElement(long timeoutMillis) throws InterruptedException {
            return this.requestNextElement(timeoutMillis, "Did not receive expected element");
        }

        public T requestNextElement(String errorMsg) throws InterruptedException {
            return this.requestNextElement(this.env.defaultTimeoutMillis(), errorMsg);
        }

        public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
            this.request(1L);
            return this.nextElement(timeoutMillis, errorMsg);
        }

        public Optional<T> requestNextElementOrEndOfStream() throws InterruptedException {
            return this.requestNextElementOrEndOfStream(this.env.defaultTimeoutMillis(), "Did not receive expected stream completion");
        }

        public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException {
            return this.requestNextElementOrEndOfStream(this.env.defaultTimeoutMillis(), errorMsg);
        }

        public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
            return this.requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
        }

        public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
            this.request(1L);
            return this.nextElementOrEndOfStream(timeoutMillis, errorMsg);
        }

        public void requestEndOfStream() throws InterruptedException {
            this.requestEndOfStream(this.env.defaultTimeoutMillis(), "Did not receive expected stream completion");
        }

        public void requestEndOfStream(long timeoutMillis) throws InterruptedException {
            this.requestEndOfStream(timeoutMillis, "Did not receive expected stream completion");
        }

        public void requestEndOfStream(String errorMsg) throws InterruptedException {
            this.requestEndOfStream(this.env.defaultTimeoutMillis(), errorMsg);
        }

        public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
            this.request(1L);
            this.expectCompletion(timeoutMillis, errorMsg);
        }

        public List<T> requestNextElements(long elements) throws InterruptedException {
            this.request(elements);
            return this.nextElements(elements, this.env.defaultTimeoutMillis());
        }

        public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException {
            this.request(elements);
            return this.nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements));
        }

        public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
            this.request(elements);
            return this.nextElements(elements, timeoutMillis, errorMsg);
        }

        public T nextElement() throws InterruptedException {
            return this.nextElement(this.env.defaultTimeoutMillis());
        }

        public T nextElement(long timeoutMillis) throws InterruptedException {
            return this.nextElement(timeoutMillis, "Did not receive expected element");
        }

        public T nextElement(String errorMsg) throws InterruptedException {
            return this.nextElement(this.env.defaultTimeoutMillis(), errorMsg);
        }

        public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
            return this.received.next(timeoutMillis, errorMsg);
        }

        public Optional<T> nextElementOrEndOfStream() throws InterruptedException {
            return this.nextElementOrEndOfStream(this.env.defaultTimeoutMillis(), "Did not receive expected stream completion");
        }

        public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
            return this.nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
        }

        public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
            return this.received.nextOrEndOfStream(timeoutMillis, errorMsg);
        }

        public List<T> nextElements(long elements) throws InterruptedException {
            return this.nextElements(elements, this.env.defaultTimeoutMillis(), "Did not receive expected element or completion");
        }

        public List<T> nextElements(long elements, String errorMsg) throws InterruptedException {
            return this.nextElements(elements, this.env.defaultTimeoutMillis(), errorMsg);
        }

        public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException {
            return this.nextElements(elements, timeoutMillis, "Did not receive expected element or completion");
        }

        public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
            return this.received.nextN(elements, timeoutMillis, errorMsg);
        }

        public void expectNext(T expected) throws InterruptedException {
            this.expectNext(expected, this.env.defaultTimeoutMillis());
        }

        public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
            T received = this.nextElement(timeoutMillis, "Did not receive expected element on downstream");
            if (!received.equals(expected)) {
                this.env.flop(String.format("Expected element %s on downstream but received %s", expected, received));
            }
        }

        public void expectCompletion() throws InterruptedException {
            this.expectCompletion(this.env.defaultTimeoutMillis(), "Did not receive expected stream completion");
        }

        public void expectCompletion(long timeoutMillis) throws InterruptedException {
            this.expectCompletion(timeoutMillis, "Did not receive expected stream completion");
        }

        public void expectCompletion(String errorMsg) throws InterruptedException {
            this.expectCompletion(this.env.defaultTimeoutMillis(), errorMsg);
        }

        public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
            this.received.expectCompletion(timeoutMillis, errorMsg);
        }

        public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception {
            this.expectErrorWithMessage(expected, requiredMessagePart, this.env.defaultTimeoutMillis());
        }

        public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception {
            this.expectErrorWithMessage(expected, requiredMessagePartAlternatives, this.env.defaultTimeoutMillis());
        }

        public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception {
            this.expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis);
        }

        public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception {
            E err = this.expectError(expected, timeoutMillis);
            String message = ((Throwable)err).getMessage();
            boolean contains = false;
            for (String requiredMessagePart : requiredMessagePartAlternatives) {
                if (!message.contains(requiredMessagePart)) continue;
                contains = true;
            }
            Assert.assertTrue((boolean)contains, (String)String.format("Got expected exception [%s] but missing message part [%s], was: %s", err.getClass(), "anyOf: " + requiredMessagePartAlternatives, ((Throwable)err).getMessage()));
        }

        public <E extends Throwable> E expectError(Class<E> expected) throws Exception {
            return this.expectError(expected, this.env.defaultTimeoutMillis());
        }

        public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception {
            return this.expectError(expected, timeoutMillis, String.format("Expected onError(%s)", expected.getName()));
        }

        public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception {
            return this.expectError(expected, this.env.defaultTimeoutMillis(), errorMsg);
        }

        public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception {
            return this.received.expectError(expected, timeoutMillis, errorMsg);
        }

        public void expectNone() throws InterruptedException {
            this.expectNone(this.env.defaultNoSignalsTimeoutMillis());
        }

        public void expectNone(String errMsgPrefix) throws InterruptedException {
            this.expectNone(this.env.defaultNoSignalsTimeoutMillis(), errMsgPrefix);
        }

        public void expectNone(long withinMillis) throws InterruptedException {
            this.expectNone(withinMillis, "Did not expect an element but got element");
        }

        public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException {
            this.received.expectNone(withinMillis, errMsgPrefix);
        }
    }
}

