/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.lang.rx.test;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.lang.rx.test.ReadStreamAdapterTestBase;
import io.vertx.lang.rx.test.TestSubscriber;
import io.vertx.test.fakestream.FakeStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.Test;

public abstract class ReadStreamAdapterBackPressureTest<O>
extends ReadStreamAdapterTestBase<Buffer, O> {
    protected abstract O toObservable(ReadStream<Buffer> var1, int var2);

    protected abstract O flatMap(O var1, Function<Buffer, O> var2);

    @Override
    protected Buffer buffer(String s) {
        return Buffer.buffer((String)s);
    }

    @Override
    protected String string(Buffer buffer) {
        return buffer.toString("UTF-8");
    }

    protected abstract long defaultMaxBufferSize();

    @Test
    public void testPause() {
        FakeStream stream = new FakeStream();
        Object observable = this.toObservable((ReadStream<Buffer>)stream);
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        this.subscribe(observable, subscriber);
        subscriber.assertEmpty();
        int i = 0;
        while ((long)i < this.defaultMaxBufferSize()) {
            stream.emit((Object)this.buffer("" + i));
            ++i;
        }
        subscriber.assertEmpty();
        subscriber.request(1L);
        subscriber.assertItem(this.buffer("0")).assertEmpty();
    }

    @Test
    public void testNoPauseWhenRequestingOne() {
        FakeStream stream = new FakeStream();
        TestSubscriber subscriber = new TestSubscriber<Buffer>(){

            @Override
            public void onNext(Buffer buffer) {
                super.onNext(buffer);
                this.request(1L);
            }
        }.prefetch(1L);
        Object observable = this.toObservable((ReadStream<Buffer>)stream);
        this.subscribe(observable, subscriber);
        stream.emit(Stream.of(this.buffer("0"), this.buffer("1"), this.buffer("2")));
    }

    @Test
    public void testUnsubscribeOnFirstItemFromBufferedDeliveredWhileRequesting() {
        for (int i = 1; i <= 3; ++i) {
            FakeStream stream = new FakeStream();
            TestSubscriber<Buffer> subscriber = new TestSubscriber<Buffer>(){

                @Override
                public void onNext(Buffer buffer) {
                    super.onNext(buffer);
                    this.unsubscribe();
                }
            }.prefetch(0L);
            O observable = this.toObservable((ReadStream<Buffer>)stream, 2);
            this.subscribe(observable, subscriber);
            stream.emit(Stream.of(this.buffer("0"), this.buffer("1")));
            subscriber.request(i);
            subscriber.assertItem(Buffer.buffer((String)"0")).assertEmpty();
        }
    }

    @Test
    public void testEndWithoutRequest() {
        this.testEndOrFailWithoutRequest(null);
    }

    @Test
    public void testFailWithoutRequest() {
        this.testEndOrFailWithoutRequest(new RuntimeException());
    }

    private void testEndOrFailWithoutRequest(Throwable err) {
        FakeStream stream = new FakeStream();
        TestSubscriber subscriber = new TestSubscriber().prefetch(0L);
        O observable = this.toObservable((ReadStream<Buffer>)stream, 2);
        this.subscribe(observable, subscriber);
        if (err == null) {
            stream.end();
            subscriber.assertEmpty();
            subscriber.request(1L);
            subscriber.assertCompleted();
        } else {
            stream.fail(err);
            subscriber.assertError(err);
        }
        subscriber.assertEmpty();
    }

    @Test
    public void testNoResumeWhenRequestingBuffered() {
        AtomicBoolean resumed = new AtomicBoolean();
        FakeStream stream = new FakeStream();
        TestSubscriber subscriber = new TestSubscriber().prefetch(0L);
        O observable = this.toObservable((ReadStream<Buffer>)stream, 2);
        this.subscribe(observable, subscriber);
        stream.emit(Stream.of(this.buffer("0"), this.buffer("1")));
        subscriber.request(1L);
        this.assertEquals(false, resumed.get());
    }

    @Test
    public void testEndDuringRequestResume() {
        FakeStream stream = new FakeStream();
        stream.drainHandler(v -> stream.end());
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        O observable = this.toObservable((ReadStream<Buffer>)stream, 10);
        this.subscribe(observable, subscriber);
        int count = 0;
        while (stream.emit((Object)Buffer.buffer((String)("" + count++)))) {
        }
        subscriber.request(count);
        for (int i = 0; i < count; ++i) {
            subscriber.assertItem(Buffer.buffer((String)("" + i)));
        }
        subscriber.assertEmpty();
        subscriber.request(1L);
        subscriber.assertCompleted();
    }

    @Test
    public void testDeliverEndWhenPaused() {
        this.testDeliverEndOrFailWhenPaused(null);
    }

    @Test
    public void testDeliverFailWhenPaused() {
        this.testDeliverEndOrFailWhenPaused(new RuntimeException());
    }

    private void testDeliverEndOrFailWhenPaused(Throwable err) {
        FakeStream stream = new FakeStream();
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        O observable = this.toObservable((ReadStream<Buffer>)stream, 2);
        this.subscribe(observable, subscriber);
        stream.emit(Stream.of(this.buffer("0"), this.buffer("1")));
        if (err == null) {
            stream.end();
        } else {
            stream.fail(err);
        }
        subscriber.request(3L);
        if (err == null) {
            subscriber.assertItems(this.buffer("0"), this.buffer("1"));
            subscriber.assertCompleted();
        } else {
            subscriber.assertError(err);
        }
        subscriber.assertEmpty();
    }

    @Test
    public void testEndWhenPaused() {
        this.testEndOrFailWhenPaused(null);
    }

    @Test
    public void testFailWhenPaused() {
        this.testEndOrFailWhenPaused(new RuntimeException());
    }

    private void testEndOrFailWhenPaused(Throwable err) {
        FakeStream stream = new FakeStream();
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        O observable = this.toObservable((ReadStream<Buffer>)stream, 2);
        this.subscribe(observable, subscriber);
        stream.emit(Stream.of(this.buffer("0"), this.buffer("1")));
        if (err == null) {
            stream.end();
        } else {
            stream.fail(err);
        }
        subscriber.request(3L);
        if (err == null) {
            subscriber.assertItems(this.buffer("0"), this.buffer("1"));
            subscriber.assertCompleted();
        } else {
            subscriber.assertError(err);
        }
        subscriber.assertEmpty();
    }

    @Test
    public void testRequestDuringOnNext() {
        FakeStream stream = new FakeStream();
        TestSubscriber<Buffer> subscriber = new TestSubscriber<Buffer>(){

            @Override
            public void onNext(Buffer buffer) {
                super.onNext(buffer);
                this.request(1L);
            }
        }.prefetch(1L);
        Object observable = this.toObservable((ReadStream<Buffer>)stream);
        this.subscribe(observable, subscriber);
        stream.emit((Object)this.buffer("0"));
        subscriber.assertItem(this.buffer("0")).assertEmpty();
        stream.emit((Object)this.buffer("1"));
        subscriber.assertItem(this.buffer("1")).assertEmpty();
        stream.emit((Object)this.buffer("2"));
        subscriber.assertItem(this.buffer("2")).assertEmpty();
        stream.end();
        subscriber.assertCompleted().assertEmpty();
    }

    @Test
    public void testDeliverDuringResume() {
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        FakeStream stream = new FakeStream();
        stream.drainHandler(v -> stream.emit((Object)this.buffer("2")));
        O observable = this.toObservable((ReadStream<Buffer>)stream, 2);
        this.subscribe(observable, subscriber);
        stream.emit((Object)Buffer.buffer((String)"0"));
        stream.emit((Object)Buffer.buffer((String)"1"));
        subscriber.request(2L);
        subscriber.assertItems(this.buffer("0"), this.buffer("1")).assertEmpty();
    }

    @Test
    public void testEndDuringResume() {
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        FakeStream stream = new FakeStream();
        stream.drainHandler(v -> stream.end());
        O observable = this.toObservable((ReadStream<Buffer>)stream, 4);
        this.subscribe(observable, subscriber);
        int count = 0;
        while (stream.emit((Object)Buffer.buffer((String)("" + count++)))) {
        }
        subscriber.request(count);
        for (int i = 0; i < count; ++i) {
            subscriber.assertItem(Buffer.buffer((String)("" + i)));
        }
        subscriber.assertEmpty();
        subscriber.request(1L);
        subscriber.assertCompleted();
    }

    @Test
    public void testBufferDuringResume() {
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        FakeStream stream = new FakeStream();
        stream.drainHandler(v -> stream.emit(Stream.of(this.buffer("2"), this.buffer("3"))));
        O observable = this.toObservable((ReadStream<Buffer>)stream, 2);
        this.subscribe(observable, subscriber);
        stream.emit(Stream.of(this.buffer("0"), this.buffer("1")));
        subscriber.request(2L);
        subscriber.assertItem(this.buffer("0")).assertItem(this.buffer("1")).assertEmpty();
    }

    @Test
    public void testFoo() {
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        FakeStream stream = new FakeStream();
        Object observable = this.toObservable((ReadStream<Buffer>)stream);
        this.subscribe(observable, subscriber);
        stream.emit((Object)this.buffer("0"));
        stream.end();
        subscriber.request(1L);
        subscriber.assertItem(this.buffer("0"));
        subscriber.request(1L);
        subscriber.assertCompleted().assertEmpty();
    }

    @Test
    public void testBar() {
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        FakeStream stream = new FakeStream();
        Object observable = this.toObservable((ReadStream<Buffer>)stream);
        this.subscribe(observable, subscriber);
        int i = 0;
        while ((long)i < this.defaultMaxBufferSize()) {
            stream.emit((Object)this.buffer("" + i));
            ++i;
        }
        stream.end();
        subscriber.request(1L);
        subscriber.assertItem(this.buffer("0")).assertEmpty();
    }

    @Test
    public void testUnsubscribeDuringOnNext() {
        TestSubscriber<Buffer> subscriber = new TestSubscriber<Buffer>(){

            @Override
            public void onNext(Buffer buffer) {
                super.onNext(buffer);
                this.unsubscribe();
            }
        };
        FakeStream stream = new FakeStream();
        Object observable = this.toObservable((ReadStream<Buffer>)stream);
        this.subscribe(observable, subscriber);
        stream.emit((Object)this.buffer("0"));
    }

    @Test
    public void testResubscribe() {
        TestSubscriber<Buffer> subscriber = new TestSubscriber().prefetch(0L);
        FakeStream stream = new FakeStream();
        O observable = this.toObservable((ReadStream<Buffer>)stream, 2);
        this.subscribe(observable, subscriber);
        stream.emit(Stream.of(this.buffer("0"), this.buffer("1")));
        subscriber.unsubscribe();
        subscriber = new TestSubscriber().prefetch(0L);
        this.subscribe(observable, subscriber);
        stream.emit((Object)this.buffer("2"));
        stream.emit((Object)this.buffer("3"));
        subscriber.assertEmpty();
        subscriber.request(2L);
        subscriber.assertItems(this.buffer("2"), this.buffer("3"));
        RuntimeException cause = new RuntimeException();
        stream.fail((Throwable)cause);
        subscriber.assertError(cause);
        this.assertTrue(subscriber.isUnsubscribed());
        subscriber = new TestSubscriber();
        this.subscribe(observable, subscriber);
        stream.end();
        subscriber.assertCompleted();
    }

    @Test
    public void testBackPressureBuffer() {
        int i;
        FakeStream stream = new FakeStream();
        O observable = this.toObservable((ReadStream<Buffer>)stream, 20);
        TestSubscriber<Buffer> subscriber = new TestSubscriber<Buffer>(){

            @Override
            public void onSubscribe(TestSubscriber.Subscription sub) {
                super.onSubscribe(sub);
                this.request(5L);
            }
        }.prefetch(0L);
        this.subscribe(observable, subscriber);
        ReadStreamAdapterBackPressureTest.waitUntil(subscriber::isSubscribed);
        AtomicInteger count = new AtomicInteger();
        while (!stream.isPaused()) {
            stream.emit((Object)this.buffer("" + count.get()));
            count.incrementAndGet();
        }
        for (i = 0; i < 5; ++i) {
            subscriber.assertItem(this.buffer("" + i));
            stream.emit((Object)Buffer.buffer((String)("" + count)));
            count.incrementAndGet();
        }
        subscriber.assertEmpty();
        subscriber.request(count.get() - 5);
        for (i = 5; i < count.get(); ++i) {
            subscriber.assertItem(this.buffer("" + i));
        }
        subscriber.assertEmpty();
        stream.end();
        subscriber.assertEmpty();
        subscriber.request(1L);
        subscriber.assertCompleted();
    }

    @Test
    public void testChained() throws Exception {
        FakeStream stream = new FakeStream();
        Object observable = this.toObservable((ReadStream<Buffer>)stream);
        TestSubscriber<Buffer> subscriber = new TestSubscriber<Buffer>();
        subscriber.prefetch(1L);
        this.subscribe(observable, subscriber);
        ReadStreamAdapterBackPressureTest.waitUntil(subscriber::isSubscribed);
        stream.emit((Object)this.buffer("foo"));
        stream.end();
        subscriber.assertItem(this.buffer("foo"));
        subscriber.assertEmpty();
        subscriber.request(1L);
        subscriber.assertCompleted();
    }

    @Test
    public void testFlatMap() {
        FakeStream stream1 = new FakeStream();
        Object obs1 = this.toObservable((ReadStream<Buffer>)stream1);
        FakeStream stream2 = new FakeStream();
        Object obs2 = this.toObservable((ReadStream<Buffer>)stream2);
        Object obs3 = this.flatMap(obs1, s -> obs2);
        TestSubscriber<Buffer> sub = new TestSubscriber<Buffer>();
        sub.prefetch(1L);
        this.subscribe(obs3, sub);
        stream1.emit((Object)this.buffer("foo"));
        stream1.end();
        stream2.emit((Object)this.buffer("bar"));
        stream2.end();
        sub.assertItem(this.buffer("bar"));
        sub.assertCompleted();
    }

    @Test
    public void testCancelWhenSubscribedPropagatesToStream() {
        final Buffer expected = this.buffer("something");
        final FakeStream stream = new FakeStream();
        Object observable = this.toObservable((ReadStream<Buffer>)stream);
        TestSubscriber<Buffer> sub = new TestSubscriber<Buffer>(){

            @Override
            public void onNext(Buffer b) {
                ReadStreamAdapterBackPressureTest.this.assertSame(b, expected);
                super.onNext(b);
                this.unsubscribe();
                ReadStreamAdapterBackPressureTest.this.assertNull((Object)stream.handler());
            }
        };
        sub.prefetch(1L);
        this.subscribe(observable, sub);
        sub.assertEmpty();
        stream.emit((Object)expected);
        sub.assertItem(expected);
        sub.assertEmpty();
        this.assertNull(stream.handler());
    }
}

