/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.streams;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.streams.ReadStreamIterator;
import io.vertx.core.streams.ReadStream;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.fakestream.FakeStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Assume;
import org.junit.Test;

public class IteratorTest
extends VertxTestBase {
    @Test
    public void testIteratorResuming() {
        int i;
        FakeStream<Integer> stream = new FakeStream<Integer>();
        stream.setWriteQueueMaxSize(0);
        Iterator iterator = ReadStreamIterator.iterator(stream);
        for (i = 0; i < 16; ++i) {
            this.assertFalse(stream.writeQueueFull());
            stream.write(i);
        }
        stream.write(17);
        this.assertTrue(stream.writeQueueFull());
        for (i = 0; i < 16; ++i) {
            iterator.next();
        }
        this.assertFalse(stream.writeQueueFull());
    }

    @Test
    public void testEnd() {
        int i;
        FakeStream<Integer> stream = new FakeStream<Integer>();
        Iterator iterator = ReadStreamIterator.iterator(stream);
        for (i = 0; i < 15; ++i) {
            stream.write(i);
        }
        stream.end();
        for (i = 0; i < 15; ++i) {
            this.assertTrue(iterator.hasNext());
            iterator.next();
        }
        this.assertFalse(iterator.hasNext());
        try {
            iterator.next();
            this.fail();
        }
        catch (NoSuchElementException noSuchElementException) {
            // empty catch block
        }
    }

    @Test
    public void testFail() {
        FakeStream<Integer> stream = new FakeStream<Integer>();
        Iterator iterator = ReadStreamIterator.iterator(stream);
        for (int i = 0; i < 15; ++i) {
            stream.write(i);
        }
        Throwable cause = new Throwable();
        stream.fail(cause);
        for (int i = 0; i < 15; ++i) {
            this.assertTrue(iterator.hasNext());
            iterator.next();
        }
        this.assertTrue(iterator.hasNext());
        try {
            iterator.next();
            this.fail();
        }
        catch (Throwable failure) {
            this.assertSame(cause, failure);
        }
    }

    @Test
    public void testHasNextSignal() throws Exception {
        FakeStream stream = new FakeStream();
        Iterator iterator = ReadStreamIterator.iterator(stream);
        int numThreads = 4;
        Thread[] consumers = new Thread[numThreads];
        for (int i = 0; i < numThreads; ++i) {
            Thread consumer;
            consumers[i] = consumer = new Thread(iterator::hasNext);
            consumer.start();
            IteratorTest.assertWaitUntil(() -> consumer.getState() == Thread.State.WAITING);
        }
        stream.end();
        for (Thread consumer : consumers) {
            consumer.join();
        }
    }

    @Repeat(times=100)
    @Test
    public void testConcurrentReads() throws Exception {
        int i;
        class Stream
        implements ReadStream<Integer> {
            private Handler<Integer> handler;
            private Handler<Void> endHandler;
            private long demand = Long.MAX_VALUE;
            private final Lock lock = new ReentrantLock();
            private final Condition producerSignal = this.lock.newCondition();

            Stream() {
            }

            public ReadStream<Integer> exceptionHandler(Handler<Throwable> handler) {
                return this;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void write(Integer element) throws InterruptedException {
                Handler<Integer> h;
                this.lock.lock();
                try {
                    while (true) {
                        long d;
                        if ((d = this.demand) > 0L) {
                            if (d != Long.MAX_VALUE) {
                                this.demand = d - 1L;
                            }
                            h = this.handler;
                            break;
                        }
                        this.producerSignal.await();
                    }
                }
                finally {
                    this.lock.unlock();
                }
                if (h != null) {
                    h.handle((Object)element);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void end() throws InterruptedException {
                Handler<Void> h;
                this.lock.lock();
                try {
                    while (true) {
                        long d;
                        if ((d = this.demand) > 0L) {
                            h = this.endHandler;
                            break;
                        }
                        this.producerSignal.await();
                    }
                }
                finally {
                    this.lock.unlock();
                }
                if (h != null) {
                    h.handle(null);
                }
            }

            public ReadStream<Integer> handler(Handler<Integer> handler) {
                this.lock.lock();
                try {
                    this.handler = handler;
                }
                finally {
                    this.lock.unlock();
                }
                return this;
            }

            public ReadStream<Integer> endHandler(Handler<Void> endHandler) {
                this.lock.lock();
                try {
                    this.endHandler = endHandler;
                }
                finally {
                    this.lock.unlock();
                }
                return this;
            }

            public ReadStream<Integer> pause() {
                this.lock.lock();
                try {
                    this.demand = 0L;
                }
                finally {
                    this.lock.unlock();
                }
                return this;
            }

            public ReadStream<Integer> resume() {
                return this.fetch(Long.MAX_VALUE);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public ReadStream<Integer> fetch(long amount) {
                if (amount < 0L) {
                    throw new IllegalArgumentException();
                }
                if (amount > 0L) {
                    this.lock.lock();
                    try {
                        long d = this.demand;
                        if ((d += amount) < 0L) {
                            d = Long.MAX_VALUE;
                        }
                        this.demand = d;
                        this.producerSignal.signal();
                    }
                    finally {
                        this.lock.unlock();
                    }
                }
                return this;
            }
        }
        Stream stream = new Stream();
        final Iterator iterator = ReadStreamIterator.iterator((ReadStream)stream);
        int numThreads = 8;
        int numElements = 16384;
        final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
        class Consumer
        extends Thread {
            final List<Integer> consumed = new ArrayList<Integer>();

            Consumer() {
            }

            @Override
            public void run() {
                try {
                    barrier.await();
                }
                catch (Exception e) {
                    return;
                }
                try {
                    while (true) {
                        Integer elt = (Integer)iterator.next();
                        this.consumed.add(elt);
                    }
                }
                catch (NoSuchElementException e) {
                    return;
                }
            }
        }
        Consumer[] consumers = new Consumer[numElements];
        for (i = 0; i < numThreads; ++i) {
            Consumer consumer = new Consumer();
            consumer.start();
            consumers[i] = consumer;
        }
        barrier.await();
        for (i = 0; i < numElements; ++i) {
            stream.write(i);
        }
        stream.end();
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (int i2 = 0; i2 < numThreads; ++i2) {
            Consumer consumer = consumers[i2];
            consumer.join(1000L);
            if (consumer.getState() != Thread.State.TERMINATED) {
                System.out.println("Could not join timely " + String.valueOf(consumer) + ":");
                Exception where = new Exception();
                where.setStackTrace(consumer.getStackTrace());
                where.printStackTrace(System.out);
                this.fail();
            }
            list.addAll(consumer.consumed);
        }
        this.assertEquals(list.size(), numElements);
    }

    @Test
    public void testVirtualThread() {
        VertxInternal vertx = (VertxInternal)this.vertx;
        Assume.assumeTrue((boolean)vertx.isVirtualThreadAvailable());
        this.doTestVirtualThread(vertx);
    }

    private void doTestVirtualThread(VertxInternal vertx) {
        FakeStream stream = new FakeStream();
        Iterator iterator = ReadStreamIterator.iterator(stream);
        ContextInternal ctx = vertx.createVirtualThreadContext();
        AtomicInteger seq = new AtomicInteger();
        ctx.runOnContext(v1 -> {
            ctx.runOnContext(v2 -> {
                this.assertEquals(0L, seq.getAndIncrement());
                stream.write(0);
            });
            this.assertEquals(0L, seq.get());
            iterator.next();
            this.assertEquals(1L, seq.getAndIncrement());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testBlockingStreamFromVirtualThread() {
        VertxInternal vertx = (VertxInternal)this.vertx;
        Assume.assumeTrue((boolean)vertx.isVirtualThreadAvailable());
        ContextInternal context = vertx.createVirtualThreadContext();
        this.testBlockingStream(task -> context.runOnContext(v -> task.run()));
    }

    @Test
    public void testBlockingStreamFromVanillaThread() {
        this.testBlockingStream(task -> {
            Thread thread = new Thread(task);
            thread.start();
        });
    }

    @Test
    public void testBlockingStreamFromVertxThread() {
        VertxInternal vertx = (VertxInternal)this.vertx;
        FakeStream readStream = new FakeStream();
        List errors = Collections.synchronizedList(new ArrayList());
        Stream blockingStream = readStream.blockingStream();
        ContextInternal context = vertx.createEventLoopContext();
        context.exceptionHandler(errors::add);
        context.runOnContext(v -> blockingStream.forEach(elt -> this.fail()));
        IteratorTest.assertWaitUntil(() -> errors.size() == 1);
        this.assertEquals(IllegalStateException.class, ((Throwable)errors.get(0)).getClass());
    }

    private void testBlockingStream(Executor runner) {
        int numItems = 4;
        FakeStream stream = new FakeStream();
        Stream blockingStream = stream.blockingStream();
        ArrayList<Integer> expected = new ArrayList<Integer>();
        for (int i = 0; i < numItems; ++i) {
            expected.add(i);
        }
        List items = Collections.synchronizedList(new ArrayList());
        runner.execute(() -> items.addAll(blockingStream.collect(Collectors.toList())));
        new Thread(() -> {
            try {
                Iterator iterator = expected.iterator();
                while (iterator.hasNext()) {
                    int elt = (Integer)iterator.next();
                    stream.write(elt);
                    Thread.sleep(10L);
                }
                stream.end();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }).start();
        IteratorTest.assertWaitUntil(() -> items.equals(expected));
    }

    @Test
    public void testBlockingStreamInterleavingFromVirtualThread() {
        VertxInternal vertx = (VertxInternal)this.vertx;
        Assume.assumeTrue((boolean)vertx.isVirtualThreadAvailable());
        ContextInternal context = vertx.createVirtualThreadContext();
        FakeStream stream = new FakeStream();
        Stream blockingStream = stream.blockingStream();
        int num = 32;
        context.runOnContext(v -> {
            AtomicInteger count = new AtomicInteger();
            vertx.setPeriodic(5L, id -> {
                this.assertSame(context, ((ContextInternal)Vertx.currentContext()).unwrap());
                int i = count.getAndIncrement();
                if (i == num) {
                    vertx.cancelTimer(id.longValue());
                    stream.end();
                } else {
                    stream.write(i);
                }
            });
            ArrayList collected = new ArrayList();
            blockingStream.forEach(collected::add);
            this.assertEquals(num, collected.size());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testStreamFailure() {
        RuntimeException expected = new RuntimeException();
        FakeStream stream = new FakeStream();
        Stream blockingStream = stream.blockingStream();
        stream.fail(expected);
        try {
            blockingStream.collect(Collectors.toList());
        }
        catch (Exception e) {
            this.assertSame(expected, e);
        }
    }

    @Test
    public void testDeadlockFromVirtualStream() {
        VertxInternal vertx = (VertxInternal)this.vertx;
        Assume.assumeTrue((boolean)vertx.isVirtualThreadAvailable());
        ContextInternal context = vertx.createVirtualThreadContext();
        FakeStream stream = new FakeStream();
        Stream blockingStream = stream.blockingStream();
        int num = 32;
        List expected = IntStream.range(0, num).boxed().collect(Collectors.toList());
        AtomicInteger count = new AtomicInteger();
        context.setPeriodic(1L, id -> {
            int val = count.incrementAndGet();
            stream.write(val - 1);
            if (val == num) {
                vertx.cancelTimer(id.longValue());
                stream.end();
            }
        });
        context.runOnContext(v -> {
            List result = blockingStream.collect(Collectors.toList());
            this.assertEquals(expected, result);
            this.testComplete();
        });
        this.await();
    }
}

