/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.lang.rx.test;

import io.vertx.core.streams.ReadStream;
import io.vertx.test.core.VertxTestBase;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;

public abstract class ReadStreamSubscriberTestBase
extends VertxTestBase {
    private final long BUFFER_SIZE = this.bufferSize();

    public abstract long bufferSize();

    protected abstract Sender sender();

    @Test
    public void testInitial() throws Exception {
        Sender sender = this.sender();
        sender.assertRequested(0L);
        Receiver receiver = new Receiver();
        receiver.subscribe(sender.stream);
        sender.assertRequested(this.BUFFER_SIZE);
        while ((long)sender.seq < this.BUFFER_SIZE / 2L) {
            sender.emit();
            sender.assertRequested(this.BUFFER_SIZE);
        }
        long i = this.BUFFER_SIZE - (long)(sender.seq - 1);
        sender.emit();
        sender.assertRequested(this.BUFFER_SIZE + i);
    }

    @Test
    public void testPause() {
        Sender sender = this.sender();
        sender.stream.resume();
        sender.stream.pause();
        Receiver receiver = new Receiver();
        receiver.subscribe(sender.stream);
        int i = 0;
        while ((long)i < this.BUFFER_SIZE) {
            sender.emit();
            this.assertEquals(this.BUFFER_SIZE, sender.requested);
            ++i;
        }
        this.assertEquals(0L, sender.available());
        receiver.assertEmpty();
        sender.stream.resume();
        this.assertEquals(this.BUFFER_SIZE, sender.available());
        receiver.assertItems("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15");
        receiver.assertEmpty();
    }

    @Test
    public void testFetch() {
        Sender sender = this.sender();
        sender.stream.pause();
        Receiver receiver = new Receiver();
        receiver.subscribe(sender.stream);
        this.assertEquals(16L, sender.requested);
        receiver.assertEmpty();
        sender.emit();
        receiver.assertEmpty();
        sender.stream.fetch(1L);
        receiver.assertItems("0");
        sender.stream.fetch(1L);
        receiver.assertItems(new String[0]);
        sender.emit(4);
        receiver.assertItems("1");
        sender.complete();
        receiver.assertEmpty();
        sender.stream.fetch(3L);
        receiver.assertItems("2", "3", "4");
        receiver.assertEnded();
    }

    @Test
    public void testCompletion() {
        Sender sender = this.sender();
        Receiver receiver = new Receiver();
        receiver.subscribe(sender.stream);
        sender.complete();
        receiver.assertEnded();
    }

    @Test
    public void testCompletionWhenPaused() {
        Sender sender = this.sender();
        sender.stream.pause();
        Receiver receiver = new Receiver();
        receiver.subscribe(sender.stream);
        sender.emit(3);
        sender.complete();
        sender.stream.resume();
        receiver.assertItems("0", "1", "2").assertEnded();
    }

    @Test
    public void testSetNullHandlersInEndHandler() {
        Sender sender = this.sender();
        AtomicInteger count = new AtomicInteger();
        sender.stream.endHandler(v -> {
            count.incrementAndGet();
            sender.stream.handler(null);
            sender.stream.endHandler(null);
            sender.stream.exceptionHandler(null);
        });
        sender.stream.handler(item -> {});
        sender.complete();
        this.assertEquals(1L, count.get());
    }

    @Test
    public void testSetHandlersAfterCompletion() {
        Sender sender = this.sender();
        sender.stream.handler(item -> {});
        sender.complete();
        try {
            sender.stream.endHandler(v -> {});
            this.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        sender.stream.endHandler(null);
        try {
            sender.stream.exceptionHandler(v -> {});
            this.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        sender.stream.exceptionHandler(null);
    }

    @Test
    public void testSetHandlersAfterError() {
        Sender sender = this.sender();
        sender.stream.handler(item -> {});
        sender.fail(new Throwable());
        try {
            sender.stream.endHandler(v -> {});
            this.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        sender.stream.endHandler(null);
        try {
            sender.stream.exceptionHandler(v -> {});
            this.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        sender.stream.exceptionHandler(null);
    }

    @Test
    public void testDontDeliverCompletionWhenPausedWithPendingBuffers() {
        Sender sender = this.sender();
        AtomicInteger failed = new AtomicInteger();
        AtomicInteger completed = new AtomicInteger();
        sender.stream.endHandler(v -> completed.incrementAndGet());
        sender.stream.exceptionHandler(v -> failed.incrementAndGet());
        sender.stream.handler(item -> {});
        sender.stream.pause();
        sender.emit();
        sender.complete();
        this.assertEquals(0L, completed.get());
        sender.stream.resume();
        this.assertEquals(1L, completed.get());
        this.assertEquals(0L, failed.get());
    }

    @Test
    public void testDontDeliverErrorWhenPausedWithPendingBuffers() {
        Sender sender = this.sender();
        AtomicInteger failed = new AtomicInteger();
        AtomicInteger completed = new AtomicInteger();
        sender.stream.endHandler(v -> completed.incrementAndGet());
        sender.stream.exceptionHandler(v -> failed.incrementAndGet());
        sender.stream.handler(item -> {});
        sender.stream.pause();
        sender.emit();
        RuntimeException cause = new RuntimeException();
        sender.fail(cause);
        this.assertEquals(0L, completed.get());
        this.assertEquals(0L, failed.get());
        sender.stream.resume();
        this.assertEquals(1L, completed.get());
        this.assertEquals(1L, failed.get());
    }

    @Test
    public void testSetHandlersAfterCompletionButPending() {
        Sender sender = this.sender();
        sender.stream.handler(item -> {});
        sender.stream.pause();
        sender.emit();
        sender.complete();
        sender.stream.exceptionHandler(err -> {});
        sender.stream.exceptionHandler(null);
        sender.stream.endHandler(v -> {});
        sender.stream.endHandler(null);
    }

    @Test
    public void testSetHandlersAfterErrorButPending() {
        Sender sender = this.sender();
        sender.stream.handler(item -> {});
        sender.stream.pause();
        sender.emit();
        sender.fail(new Throwable());
        sender.stream.exceptionHandler(err -> {});
        sender.stream.exceptionHandler(null);
        sender.stream.endHandler(v -> {});
        sender.stream.endHandler(null);
    }

    @Test
    public void testSetNullHandlerUnsubscribes() {
        Sender sender = this.sender();
        sender.stream.handler(item -> {});
        sender.emit();
        sender.stream.handler(null);
        this.assertTrue(sender.isUnsubscribed());
    }

    @Test
    public void testReadStreamElementsMustBeSerialized() {
        Sender sender = this.sender();
        List receivedElements = Collections.synchronizedList(new ArrayList());
        AtomicReference threadThatReceivedFirstItem = new AtomicReference();
        AtomicReference threadThatReceivedSecondItem = new AtomicReference();
        sender.stream.handler(item -> {
            if (threadThatReceivedFirstItem.get() != null) {
                threadThatReceivedSecondItem.set(Thread.currentThread());
            } else {
                threadThatReceivedFirstItem.set(Thread.currentThread());
                ReadStreamSubscriberTestBase.publishItemFromAnotherThread(sender);
            }
            receivedElements.add(item);
        });
        sender.stream.pause();
        sender.emit();
        sender.stream.resume();
        this.assertEquals(Arrays.asList("0", "1"), receivedElements);
        this.assertEquals(threadThatReceivedFirstItem.get(), threadThatReceivedSecondItem.get());
    }

    private static void publishItemFromAnotherThread(Sender readStream) {
        Thread t = new Thread(readStream::emit);
        t.start();
        try {
            t.join();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private class Receiver
    extends ArrayDeque<Object> {
        final Object DONE = new Object();

        private Receiver() {
        }

        void handle(String item) {
            this.add(item);
        }

        void handleException(Throwable t) {
            this.add(t);
        }

        void handleEnd(Void v) {
            this.add(this.DONE);
        }

        void subscribe(ReadStream<String> sender) {
            sender.exceptionHandler(this::handleException);
            sender.endHandler(this::handleEnd);
            sender.handler(this::handle);
        }

        Receiver assertEmpty() {
            ReadStreamSubscriberTestBase.this.assertEquals(Collections.emptyList(), new ArrayList<Object>(this));
            return this;
        }

        Receiver assertItems(String ... items) {
            ArrayList actual = new ArrayList();
            while (this.size() > 0 && actual.size() < items.length) {
                actual.add(this.remove());
            }
            ReadStreamSubscriberTestBase.this.assertEquals(Arrays.asList(items), actual);
            return this;
        }

        void assertEnded() {
            ReadStreamSubscriberTestBase.this.assertEquals(this.DONE, this.remove());
            this.assertEmpty();
        }
    }

    public abstract class Sender {
        protected ReadStream<String> stream;
        protected long requested;
        protected int seq;

        protected abstract void emit();

        void emit(int times) {
            for (int i = 0; i < times; ++i) {
                this.emit();
            }
        }

        protected abstract void complete();

        protected abstract void fail(Throwable var1);

        void assertRequested(long expected) {
            ReadStreamSubscriberTestBase.this.assertEquals(expected, this.requested);
        }

        long available() {
            return this.requested - (long)this.seq;
        }

        protected abstract boolean isUnsubscribed();
    }
}

