/*
 * Decompiled with CFR 0.152.
 */
package org.glassfish.grizzly;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import junit.framework.Assert;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.EmptyCompletionHandler;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.GrizzlyFuture;
import org.glassfish.grizzly.GrizzlyTestCase;
import org.glassfish.grizzly.PendingWriteQueueLimitExceededException;
import org.glassfish.grizzly.Processor;
import org.glassfish.grizzly.StandaloneProcessor;
import org.glassfish.grizzly.Transport;
import org.glassfish.grizzly.WriteResult;
import org.glassfish.grizzly.Writer;
import org.glassfish.grizzly.asyncqueue.AsyncQueueWriter;
import org.glassfish.grizzly.asyncqueue.TaskQueue;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.Filter;
import org.glassfish.grizzly.filterchain.FilterChainBuilder;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.filterchain.TransportFilter;
import org.glassfish.grizzly.impl.FutureImpl;
import org.glassfish.grizzly.impl.SafeFutureImpl;
import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.nio.NIOConnection;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.streams.StreamReader;
import org.glassfish.grizzly.utils.EchoFilter;

public class AsyncWriteQueueTest
extends GrizzlyTestCase {
    public static final int PORT = 7781;
    private static final Logger LOGGER = Grizzly.logger(AsyncWriteQueueTest.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testAsyncWriteQueueEcho() throws Exception {
        Connection connection = null;
        StreamReader reader = null;
        Object writer = null;
        int packetNumber = 127;
        int packetSize = 128000;
        final AtomicInteger serverRcvdBytes = new AtomicInteger();
        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
        filterChainBuilder.add((Filter)new TransportFilter());
        filterChainBuilder.add((Filter)new EchoFilter(){

            public NextAction handleRead(FilterChainContext ctx) throws IOException {
                serverRcvdBytes.addAndGet(((Buffer)ctx.getMessage()).remaining());
                return super.handleRead(ctx);
            }
        });
        TCPNIOTransport transport = TCPNIOTransportBuilder.newInstance().build();
        transport.setProcessor((Processor)filterChainBuilder.build());
        try {
            transport.bind(7781);
            transport.start();
            GrizzlyFuture future = transport.connect("localhost", 7781);
            connection = (Connection)future.get(10L, TimeUnit.SECONDS);
            AsyncWriteQueueTest.assertTrue((connection != null ? 1 : 0) != 0);
            connection.configureStandalone(true);
            reader = ((StandaloneProcessor)connection.getProcessor()).getStreamReader(connection);
            AsyncQueueWriter asyncQueueWriter = transport.getAsyncQueueIO().getWriter();
            final MemoryManager mm = transport.getMemoryManager();
            Connection con = connection;
            final CountDownLatch latch = new CountDownLatch(127);
            EmptyCompletionHandler<WriteResult<Buffer, SocketAddress>> completionHandler = new EmptyCompletionHandler<WriteResult<Buffer, SocketAddress>>(){

                public void completed(WriteResult<Buffer, SocketAddress> result) {
                    latch.countDown();
                }
            };
            ArrayList<3> sendTasks = new ArrayList<3>(128);
            for (int i = 0; i < 127; ++i) {
                final byte b = (byte)i;
                sendTasks.add(new Callable<Object>((Writer)asyncQueueWriter, con, (CompletionHandler)completionHandler){
                    final /* synthetic */ Writer val$asyncQueueWriter;
                    final /* synthetic */ Connection val$con;
                    final /* synthetic */ CompletionHandler val$completionHandler;
                    {
                        this.val$asyncQueueWriter = writer;
                        this.val$con = connection;
                        this.val$completionHandler = completionHandler;
                    }

                    @Override
                    public Object call() throws Exception {
                        byte[] originalMessage = new byte[128000];
                        Arrays.fill(originalMessage, b);
                        Buffer buffer = Buffers.wrap((MemoryManager)mm, (byte[])originalMessage);
                        try {
                            this.val$asyncQueueWriter.write(this.val$con, buffer, this.val$completionHandler);
                        }
                        catch (IOException e) {
                            Assert.assertTrue((String)"IOException occurred", (boolean)false);
                        }
                        return null;
                    }
                });
            }
            ExecutorService executorService = Executors.newFixedThreadPool(12);
            try {
                executorService.invokeAll(sendTasks);
                if (!latch.await(10L, TimeUnit.SECONDS)) {
                    AsyncWriteQueueTest.assertTrue((String)"Send timeout!", (boolean)false);
                }
            }
            finally {
                executorService.shutdown();
            }
            int responseSize = 16256000;
            GrizzlyFuture readFuture = reader.notifyAvailable(responseSize);
            Integer available = null;
            try {
                available = (Integer)readFuture.get(10L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                LOGGER.log(Level.WARNING, "read error", e);
            }
            AsyncWriteQueueTest.assertTrue((String)("Read timeout. Server received: " + serverRcvdBytes.get() + " bytes. Expected: " + 16256000), (available != null ? 1 : 0) != 0);
            byte[] echoMessage = new byte[responseSize];
            reader.readByteArray(echoMessage);
            boolean[] isByteUsed = new boolean[127];
            int offset = 0;
            for (int i = 0; i < 127; ++i) {
                byte pattern = echoMessage[offset];
                AsyncWriteQueueTest.assertEquals((String)("Pattern: " + pattern + " was already used"), (boolean)false, (boolean)isByteUsed[pattern]);
                isByteUsed[pattern] = true;
                for (int j = 0; j < 128000; ++j) {
                    byte check = echoMessage[offset++];
                    AsyncWriteQueueTest.assertEquals((String)("Echo doesn't match. Offset: " + offset + " pattern: " + pattern + " found: " + check), (byte)pattern, (byte)check);
                }
            }
        }
        finally {
            if (connection != null) {
                connection.close();
            }
            transport.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testAsyncWriteQueueLimits() throws Exception {
        Connection connection = null;
        int packetSize = 256000;
        int queueLimit = 512001;
        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
        filterChainBuilder.add((Filter)new TransportFilter());
        TCPNIOTransport transport = TCPNIOTransportBuilder.newInstance().build();
        transport.setProcessor((Processor)filterChainBuilder.build());
        try {
            transport.bind(7781);
            transport.start();
            GrizzlyFuture future = transport.connect("localhost", 7781);
            connection = (Connection)future.get(10L, TimeUnit.SECONDS);
            AsyncWriteQueueTest.assertTrue((connection != null ? 1 : 0) != 0);
            connection.configureStandalone(true);
            AsyncQueueWriter asyncQueueWriter = transport.getAsyncQueueIO().getWriter();
            asyncQueueWriter.setMaxPendingBytesPerConnection(512001);
            MemoryManager mm = transport.getMemoryManager();
            final Connection con = connection;
            final AtomicBoolean failed = new AtomicBoolean(false);
            transport.pause();
            int i = 0;
            int loopCount = 0;
            final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
            final AtomicInteger exceptionAtLoopCount = new AtomicInteger();
            while (!failed.get() && loopCount < 4) {
                final int lc = loopCount;
                byte b = (byte)i;
                byte[] originalMessage = new byte[256000];
                Arrays.fill(originalMessage, b);
                Buffer buffer = Buffers.wrap((MemoryManager)mm, (byte[])originalMessage);
                try {
                    if (asyncQueueWriter.canWrite(con, buffer.remaining())) {
                        asyncQueueWriter.write(con, buffer);
                    } else if (loopCount == 3) {
                        asyncQueueWriter.write(con, buffer, (CompletionHandler)new EmptyCompletionHandler<WriteResult<Buffer, SocketAddress>>(){

                            public void failed(Throwable throwable) {
                                if (throwable instanceof PendingWriteQueueLimitExceededException) {
                                    exceptionThrown.compareAndSet(false, true);
                                    exceptionAtLoopCount.set(lc);
                                    Assert.assertTrue((((NIOConnection)con).getAsyncWriteQueue().spaceInBytes() + 256000 > 512001 ? 1 : 0) != 0);
                                }
                                failed.compareAndSet(false, true);
                            }
                        });
                    } else {
                        ++loopCount;
                        transport.resume();
                        Thread.sleep(5000L);
                        transport.pause();
                    }
                }
                catch (IOException e) {
                    e.printStackTrace();
                    AsyncWriteQueueTest.assertTrue((String)("IOException occurred: " + e.toString()), (boolean)false);
                }
                ++i;
            }
            if (!exceptionThrown.get()) {
                AsyncWriteQueueTest.fail((String)"No Exception thrown when queue write limit exceeded");
            }
            if (exceptionAtLoopCount.get() != 3) {
                AsyncWriteQueueTest.fail((String)("Expected exception to occur at 4th iteration of test loop.  Occurred at: " + exceptionAtLoopCount));
            }
        }
        finally {
            if (connection != null) {
                connection.close();
            }
            if (transport.isPaused()) {
                transport.resume();
            }
            transport.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testQueueNotification() throws Exception {
        Connection connection = null;
        int packetSize = 256000;
        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
        filterChainBuilder.add((Filter)new TransportFilter());
        TCPNIOTransport transport = TCPNIOTransportBuilder.newInstance().build();
        transport.setProcessor((Processor)filterChainBuilder.build());
        try {
            transport.bind(7781);
            transport.start();
            GrizzlyFuture future = transport.connect("localhost", 7781);
            connection = (Connection)future.get(10L, TimeUnit.SECONDS);
            AsyncWriteQueueTest.assertTrue((connection != null ? 1 : 0) != 0);
            connection.configureStandalone(true);
            AsyncQueueWriter asyncQueueWriter = transport.getAsyncQueueIO().getWriter();
            asyncQueueWriter.setMaxPendingBytesPerConnection(2560000);
            System.out.println("Max Space: " + asyncQueueWriter.getMaxPendingBytesPerConnection());
            MemoryManager mm = transport.getMemoryManager();
            Connection con = connection;
            transport.pause();
            TaskQueue tqueue = ((NIOConnection)connection).getAsyncWriteQueue();
            do {
                byte[] originalMessage = new byte[256000];
                Arrays.fill(originalMessage, (byte)1);
                Buffer buffer = Buffers.wrap((MemoryManager)mm, (byte[])originalMessage);
                try {
                    if (!asyncQueueWriter.canWrite(con, 256000)) continue;
                    asyncQueueWriter.write(con, buffer);
                }
                catch (IOException e) {
                    AsyncWriteQueueTest.assertTrue((String)("IOException occurred: " + e.toString()), (boolean)false);
                }
            } while (asyncQueueWriter.canWrite(con, 256000));
            tqueue.addQueueMonitor((TaskQueue.QueueMonitor)new WriteQueueFreeSpaceMonitor(con, 1024000));
            transport.resume();
            long start = 0L;
            try {
                System.out.println("Waiting for free space notification.  Max wait time is 10000ms.");
                start = System.currentTimeMillis();
                Thread.sleep(10000L);
                AsyncWriteQueueTest.fail((String)"Thread not interrupted within 10 seconds.");
            }
            catch (InterruptedException ie) {
                long result = System.currentTimeMillis() - start;
                System.out.println("Notified in " + result + "ms");
            }
            AsyncWriteQueueTest.assertTrue((asyncQueueWriter.getMaxPendingBytesPerConnection() - tqueue.spaceInBytes() >= 1024000 ? 1 : 0) != 0);
            System.out.println("Queue Space: " + tqueue.spaceInBytes());
        }
        finally {
            if (connection != null) {
                connection.close();
            }
            if (transport.isPaused()) {
                transport.resume();
            }
            transport.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testAsyncWriteQueueReenterants() throws Exception {
        Connection connection = null;
        final AtomicInteger serverRcvdBytes = new AtomicInteger();
        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
        filterChainBuilder.add((Filter)new TransportFilter());
        filterChainBuilder.add((Filter)new BaseFilter(){

            public NextAction handleRead(FilterChainContext ctx) throws IOException {
                serverRcvdBytes.addAndGet(((Buffer)ctx.getMessage()).remaining());
                return ctx.getStopAction();
            }
        });
        TCPNIOTransport transport = TCPNIOTransportBuilder.newInstance().build();
        transport.setProcessor((Processor)filterChainBuilder.build());
        try {
            transport.bind(7781);
            transport.start();
            GrizzlyFuture future = transport.connect("localhost", 7781);
            connection = (Connection)future.get(10L, TimeUnit.SECONDS);
            AsyncWriteQueueTest.assertTrue((connection != null ? 1 : 0) != 0);
            connection.configureStandalone(true);
            final AsyncQueueWriter asyncQueueWriter = transport.getAsyncQueueIO().getWriter();
            final MemoryManager mm = transport.getMemoryManager();
            final Connection con = connection;
            int maxReenterants = 10;
            asyncQueueWriter.setMaxWriteReenterants(10);
            final AtomicInteger packetCounter = new AtomicInteger();
            SafeFutureImpl resultFuture = SafeFutureImpl.create();
            final ConcurrentLinkedQueue<Thread> threadsHistory = new ConcurrentLinkedQueue<Thread>();
            Thread currentThread = Thread.currentThread();
            threadsHistory.add(currentThread);
            Buffer buffer = Buffers.wrap((MemoryManager)mm, (String)("" + (char)(65 + packetCounter.getAndIncrement())));
            asyncQueueWriter.write(con, buffer, (CompletionHandler)new EmptyCompletionHandler<WriteResult<Buffer, SocketAddress>>((FutureImpl)resultFuture){
                final /* synthetic */ FutureImpl val$resultFuture;
                {
                    this.val$resultFuture = futureImpl;
                }

                public void completed(WriteResult<Buffer, SocketAddress> result) {
                    int packetNum = packetCounter.incrementAndGet();
                    if (packetNum <= 11) {
                        threadsHistory.add(Thread.currentThread());
                        Buffer bufferInner = Buffers.wrap((MemoryManager)mm, (String)("" + (char)(65 + packetNum)));
                        try {
                            asyncQueueWriter.write(con, bufferInner, (CompletionHandler)this);
                        }
                        catch (IOException e) {
                            this.val$resultFuture.failure((Throwable)e);
                        }
                    } else {
                        this.val$resultFuture.result((Object)Boolean.TRUE);
                    }
                }
            });
            AsyncWriteQueueTest.assertTrue((boolean)((Boolean)resultFuture.get(10L, TimeUnit.SECONDS)));
            while (!threadsHistory.isEmpty()) {
                Thread t = (Thread)threadsHistory.poll();
                if (!threadsHistory.isEmpty()) {
                    AsyncWriteQueueTest.assertSame((Object)currentThread, (Object)t);
                    continue;
                }
                AsyncWriteQueueTest.assertNotSame((Object)currentThread, (Object)t);
            }
        }
        finally {
            if (connection != null) {
                connection.close();
            }
            transport.stop();
        }
    }

    private static class WriteQueueFreeSpaceMonitor
    extends TaskQueue.QueueMonitor {
        private final TaskQueue writeQueue;
        private final int freeSpaceAvailable;
        private final int maxSpace;
        private final Transport transport;
        private final Thread current;

        public WriteQueueFreeSpaceMonitor(Connection c, int freeSpaceAvailable) {
            this.freeSpaceAvailable = freeSpaceAvailable;
            this.writeQueue = ((NIOConnection)c).getAsyncWriteQueue();
            this.transport = c.getTransport();
            this.maxSpace = ((TCPNIOTransport)this.transport).getAsyncQueueIO().getWriter().getMaxPendingBytesPerConnection();
            this.current = Thread.currentThread();
        }

        public boolean shouldNotify() {
            return this.maxSpace - this.writeQueue.spaceInBytes() > this.freeSpaceAvailable;
        }

        public void onNotify() {
            try {
                this.transport.pause();
            }
            catch (IOException ioe) {
                ioe.printStackTrace();
            }
            this.current.interrupt();
        }
    }
}

