/*
 * Decompiled with CFR 0.152.
 */
package org.asynchttpclient.reactivestreams;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.reactivex.Flowable;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.Servlet;
import javax.servlet.ServletInputStream;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.catalina.Context;
import org.apache.catalina.Wrapper;
import org.apache.catalina.startup.Tomcat;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.asynchttpclient.handler.StreamedAsyncHandler;
import org.asynchttpclient.test.TestUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ReactiveStreamsTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsTest.class);
    private Tomcat tomcat;
    private int port1;
    private ExecutorService executor;

    private static Publisher<ByteBuf> createPublisher(byte[] bytes, int chunkSize) {
        return Flowable.fromIterable((Iterable)new ByteBufIterable(bytes, chunkSize));
    }

    private Publisher<ByteBuf> createAsyncPublisher(byte[] bytes, int chunkSize) {
        return new AsyncIterablePublisher((Iterable)new ByteBufIterable(bytes, chunkSize), (Executor)this.executor);
    }

    private static byte[] getBytes(List<HttpResponseBodyPart> bodyParts) throws IOException {
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        for (HttpResponseBodyPart part : bodyParts) {
            bytes.write(part.getBodyPartBytes());
        }
        return bytes.toByteArray();
    }

    @BeforeClass(alwaysRun=true)
    public void setUpGlobal() throws Exception {
        String path = new File(".").getAbsolutePath() + "/target";
        this.tomcat = new Tomcat();
        this.tomcat.setHostname("localhost");
        this.tomcat.setPort(0);
        this.tomcat.setBaseDir(path);
        Context ctx = this.tomcat.addContext("", path);
        Wrapper wrapper = Tomcat.addServlet((Context)ctx, (String)"webdav", (Servlet)new HttpServlet(){

            public void service(HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws IOException {
                Enumeration i;
                String queryString;
                String headerName;
                LOGGER.debug("Echo received request {} on path {}", (Object)httpRequest, (Object)httpRequest.getServletContext().getContextPath());
                if (httpRequest.getHeader("X-HEAD") != null) {
                    httpResponse.setContentLength(1);
                }
                if (httpRequest.getHeader("X-ISO") != null) {
                    httpResponse.setContentType("text/html;charset=ISO-8859-1");
                } else {
                    httpResponse.setContentType("text/html;charset=UTF-8");
                }
                if (httpRequest.getMethod().equalsIgnoreCase("OPTIONS")) {
                    httpResponse.addHeader("Allow", "GET,HEAD,POST,OPTIONS,TRACE");
                }
                Enumeration e = httpRequest.getHeaderNames();
                while (e.hasMoreElements()) {
                    headerName = (String)e.nextElement();
                    if (headerName.startsWith("LockThread")) {
                        int sleepTime = httpRequest.getIntHeader(headerName);
                        try {
                            Thread.sleep(sleepTime == -1 ? 40L : (long)(sleepTime * 1000));
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    }
                    if (headerName.startsWith("X-redirect")) {
                        httpResponse.sendRedirect(httpRequest.getHeader("X-redirect"));
                        return;
                    }
                    httpResponse.addHeader("X-" + headerName, httpRequest.getHeader(headerName));
                }
                String pathInfo = httpRequest.getPathInfo();
                if (pathInfo != null) {
                    httpResponse.addHeader("X-pathInfo", pathInfo);
                }
                if ((queryString = httpRequest.getQueryString()) != null) {
                    httpResponse.addHeader("X-queryString", queryString);
                }
                httpResponse.addHeader("X-KEEP-ALIVE", httpRequest.getRemoteAddr() + ":" + httpRequest.getRemotePort());
                Cookie[] cs = httpRequest.getCookies();
                if (cs != null) {
                    for (Cookie c : cs) {
                        httpResponse.addCookie(c);
                    }
                }
                if ((i = httpRequest.getParameterNames()).hasMoreElements()) {
                    StringBuilder requestBody = new StringBuilder();
                    while (i.hasMoreElements()) {
                        headerName = (String)i.nextElement();
                        httpResponse.addHeader("X-" + headerName, httpRequest.getParameter(headerName));
                        requestBody.append(headerName);
                        requestBody.append("_");
                    }
                    if (requestBody.length() > 0) {
                        String body = requestBody.toString();
                        httpResponse.getOutputStream().write(body.getBytes());
                    }
                }
                final AsyncContext context = httpRequest.startAsync();
                final ServletInputStream input = httpRequest.getInputStream();
                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                input.setReadListener(new ReadListener(){
                    byte[] buffer = new byte[5120];

                    public void onError(Throwable t) {
                        t.printStackTrace();
                        httpResponse.setStatus(io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
                        context.complete();
                    }

                    public void onDataAvailable() throws IOException {
                        int len;
                        while (input.isReady() && (len = input.read(this.buffer)) != -1) {
                            baos.write(this.buffer, 0, len);
                        }
                    }

                    public void onAllDataRead() throws IOException {
                        byte[] requestBodyBytes = baos.toByteArray();
                        int total = requestBodyBytes.length;
                        httpResponse.addIntHeader("X-" + HttpHeaderNames.CONTENT_LENGTH, total);
                        String md5 = TestUtils.md5(requestBodyBytes, 0, total);
                        httpResponse.addHeader(HttpHeaderNames.CONTENT_MD5.toString(), md5);
                        httpResponse.getOutputStream().write(requestBodyBytes, 0, total);
                        context.complete();
                    }
                });
            }
        });
        wrapper.setAsyncSupported(true);
        ctx.addServletMappingDecoded("/*", "webdav");
        this.tomcat.start();
        this.port1 = this.tomcat.getConnector().getLocalPort();
        this.executor = Executors.newSingleThreadExecutor();
    }

    @AfterClass(alwaysRun=true)
    public void tearDownGlobal() throws Exception {
        this.tomcat.stop();
        this.executor.shutdown();
    }

    private String getTargetUrl() {
        return String.format("http://localhost:%d/foo/test", this.port1);
    }

    @Test
    public void testStreamingPutImage() throws Exception {
        try (AsyncHttpClient client = Dsl.asyncHttpClient((DefaultAsyncHttpClientConfig.Builder)Dsl.config().setRequestTimeout(600000));){
            Response response = (Response)((BoundRequestBuilder)client.preparePut(this.getTargetUrl()).setBody(this.createAsyncPublisher(TestUtils.LARGE_IMAGE_BYTES, 2342))).execute().get();
            Assert.assertEquals((int)response.getStatusCode(), (int)200);
            Assert.assertEquals((byte[])response.getResponseBodyAsBytes(), (byte[])TestUtils.LARGE_IMAGE_BYTES);
        }
    }

    @Test
    public void testAsyncStreamingPutImage() throws Exception {
        try (AsyncHttpClient client = Dsl.asyncHttpClient((DefaultAsyncHttpClientConfig.Builder)Dsl.config().setRequestTimeout(600000));){
            Response response = (Response)((BoundRequestBuilder)client.preparePut(this.getTargetUrl()).setBody(ReactiveStreamsTest.createPublisher(TestUtils.LARGE_IMAGE_BYTES, 2342))).execute().get();
            Assert.assertEquals((int)response.getStatusCode(), (int)200);
            Assert.assertEquals((byte[])response.getResponseBodyAsBytes(), (byte[])TestUtils.LARGE_IMAGE_BYTES);
        }
    }

    @Test
    public void testConnectionDoesNotGetClosed() throws Exception {
        try (AsyncHttpClient client = Dsl.asyncHttpClient((DefaultAsyncHttpClientConfig.Builder)Dsl.config().setRequestTimeout(600000));){
            BoundRequestBuilder requestBuilder = (BoundRequestBuilder)((BoundRequestBuilder)((BoundRequestBuilder)client.preparePut(this.getTargetUrl()).setBody(ReactiveStreamsTest.createPublisher(TestUtils.LARGE_IMAGE_BYTES, 1000))).setHeader((CharSequence)("X-" + HttpHeaderNames.CONTENT_LENGTH), (Object)TestUtils.LARGE_IMAGE_BYTES.length)).setHeader((CharSequence)("X-" + HttpHeaderNames.CONTENT_MD5), TestUtils.LARGE_IMAGE_BYTES_MD5);
            Response response = (Response)requestBuilder.execute().get();
            Assert.assertEquals((int)response.getStatusCode(), (int)200, (String)"HTTP response was invalid on first request.");
            byte[] responseBody = response.getResponseBodyAsBytes();
            Assert.assertEquals((int)Integer.valueOf(response.getHeader((CharSequence)("X-" + HttpHeaderNames.CONTENT_LENGTH))), (int)TestUtils.LARGE_IMAGE_BYTES.length, (String)"Server side payload length invalid");
            Assert.assertEquals((int)responseBody.length, (int)TestUtils.LARGE_IMAGE_BYTES.length, (String)"Client side payload length invalid");
            Assert.assertEquals((String)response.getHeader((CharSequence)HttpHeaderNames.CONTENT_MD5), (String)TestUtils.LARGE_IMAGE_BYTES_MD5, (String)"Server side payload MD5 invalid");
            Assert.assertEquals((String)TestUtils.md5(responseBody), (String)TestUtils.LARGE_IMAGE_BYTES_MD5, (String)"Client side payload MD5 invalid");
            Assert.assertEquals((byte[])responseBody, (byte[])TestUtils.LARGE_IMAGE_BYTES, (String)"Image bytes are not equal on first attempt");
            response = (Response)requestBuilder.execute().get();
            Assert.assertEquals((int)response.getStatusCode(), (int)200);
            responseBody = response.getResponseBodyAsBytes();
            Assert.assertEquals((int)Integer.valueOf(response.getHeader((CharSequence)("X-" + HttpHeaderNames.CONTENT_LENGTH))), (int)TestUtils.LARGE_IMAGE_BYTES.length, (String)"Server side payload length invalid");
            Assert.assertEquals((int)responseBody.length, (int)TestUtils.LARGE_IMAGE_BYTES.length, (String)"Client side payload length invalid");
            try {
                Assert.assertEquals((String)response.getHeader((CharSequence)HttpHeaderNames.CONTENT_MD5), (String)TestUtils.LARGE_IMAGE_BYTES_MD5, (String)"Server side payload MD5 invalid");
                Assert.assertEquals((String)TestUtils.md5(responseBody), (String)TestUtils.LARGE_IMAGE_BYTES_MD5, (String)"Client side payload MD5 invalid");
                Assert.assertEquals((byte[])responseBody, (byte[])TestUtils.LARGE_IMAGE_BYTES, (String)"Image bytes weren't equal on subsequent test");
            }
            catch (AssertionError e) {
                ((Throwable)((Object)e)).printStackTrace();
                for (int i = 0; i < TestUtils.LARGE_IMAGE_BYTES.length; ++i) {
                    Assert.assertEquals((byte)responseBody[i], (byte)TestUtils.LARGE_IMAGE_BYTES[i], (String)("Invalid response byte at position " + i));
                }
                throw e;
            }
        }
    }

    @Test(expectedExceptions={ExecutionException.class})
    public void testFailingStream() throws Exception {
        try (AsyncHttpClient client = Dsl.asyncHttpClient((DefaultAsyncHttpClientConfig.Builder)Dsl.config().setRequestTimeout(600000));){
            Flowable failingPublisher = Flowable.error((Throwable)new FailedStream());
            ((BoundRequestBuilder)client.preparePut(this.getTargetUrl()).setBody((Publisher)failingPublisher)).execute().get();
        }
    }

    @Test
    public void streamedResponseTest() throws Throwable {
        try (AsyncHttpClient c = Dsl.asyncHttpClient();){
            SimpleSubscriber<Object> subscriber = new SimpleSubscriber<HttpResponseBodyPart>();
            ListenableFuture future = ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new SimpleStreamedAsyncHandler(subscriber));
            future.get();
            Assert.assertEquals((byte[])ReactiveStreamsTest.getBytes(subscriber.getElements()), (byte[])TestUtils.LARGE_IMAGE_BYTES);
            subscriber = new SimpleSubscriber();
            future = ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new SimpleStreamedAsyncHandler(subscriber));
            future.get();
            Assert.assertEquals((byte[])ReactiveStreamsTest.getBytes(subscriber.getElements()), (byte[])TestUtils.LARGE_IMAGE_BYTES);
            Assert.assertEquals((String)((Response)((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody("Hello")).execute().get()).getResponseBody(), (String)"Hello");
        }
    }

    @Test
    public void cancelStreamedResponseTest() throws Throwable {
        try (AsyncHttpClient c = Dsl.asyncHttpClient();){
            ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new CancellingStreamedAsyncProvider(0)).get();
            ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new CancellingStreamedAsyncProvider(1)).get();
            ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new CancellingStreamedAsyncProvider(10)).get();
            Assert.assertEquals((String)((Response)((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody("Hello")).execute().get()).getResponseBody(), (String)"Hello");
        }
    }

    private class FailedStream
    extends RuntimeException {
        private FailedStream() {
        }
    }

    static class ByteBufIterable
    implements Iterable<ByteBuf> {
        private final byte[] payload;
        private final int chunkSize;

        ByteBufIterable(byte[] payload, int chunkSize) {
            this.payload = payload;
            this.chunkSize = chunkSize;
        }

        @Override
        public Iterator<ByteBuf> iterator() {
            return new Iterator<ByteBuf>(){
                private int currentIndex = 0;

                @Override
                public boolean hasNext() {
                    return this.currentIndex != payload.length;
                }

                @Override
                public ByteBuf next() {
                    int thisCurrentIndex = this.currentIndex;
                    int length = Math.min(chunkSize, payload.length - thisCurrentIndex);
                    this.currentIndex += length;
                    return Unpooled.wrappedBuffer((byte[])payload, (int)thisCurrentIndex, (int)length);
                }

                @Override
                public void remove() {
                    throw new UnsupportedOperationException("ByteBufferIterable's iterator does not support remove.");
                }
            };
        }
    }

    static class CancellingSubscriber<T>
    implements Subscriber<T> {
        private final int cancelAfter;
        private volatile Subscription subscription;
        private AtomicInteger count = new AtomicInteger(0);

        CancellingSubscriber(int cancelAfter) {
            this.cancelAfter = cancelAfter;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            if (this.cancelAfter == 0) {
                subscription.cancel();
            } else {
                subscription.request(1L);
            }
        }

        public void onNext(T t) {
            if (this.count.incrementAndGet() == this.cancelAfter) {
                this.subscription.cancel();
            } else {
                this.subscription.request(1L);
            }
        }

        public void onError(Throwable error) {
        }

        public void onComplete() {
        }
    }

    static class CancellingStreamedAsyncProvider
    implements StreamedAsyncHandler<CancellingStreamedAsyncProvider> {
        private final int cancelAfter;

        CancellingStreamedAsyncProvider(int cancelAfter) {
            this.cancelAfter = cancelAfter;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            publisher.subscribe(new CancellingSubscriber(this.cancelAfter));
            return AsyncHandler.State.CONTINUE;
        }

        public void onThrowable(Throwable t) {
            throw new AssertionError((Object)t);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart bodyPart) {
            throw new AssertionError((Object)"Should not have received body part");
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus responseStatus) {
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpHeaders headers) {
            return AsyncHandler.State.CONTINUE;
        }

        public CancellingStreamedAsyncProvider onCompleted() {
            return this;
        }
    }

    static class SimpleSubscriber<T>
    implements Subscriber<T> {
        private final List<T> elements = Collections.synchronizedList(new ArrayList());
        private final CountDownLatch latch = new CountDownLatch(1);
        private volatile Subscription subscription;
        private volatile Throwable error;

        SimpleSubscriber() {
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        public void onNext(T t) {
            this.elements.add(t);
            this.subscription.request(1L);
        }

        public void onError(Throwable error) {
            this.error = error;
            this.latch.countDown();
        }

        public void onComplete() {
            this.latch.countDown();
        }

        List<T> getElements() throws Throwable {
            this.latch.await();
            if (this.error != null) {
                throw this.error;
            }
            return this.elements;
        }
    }

    static class SimpleStreamedAsyncHandler
    implements StreamedAsyncHandler<Void> {
        private final Subscriber<HttpResponseBodyPart> subscriber;

        SimpleStreamedAsyncHandler(Subscriber<HttpResponseBodyPart> subscriber) {
            this.subscriber = subscriber;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            publisher.subscribe(this.subscriber);
            return AsyncHandler.State.CONTINUE;
        }

        public void onThrowable(Throwable t) {
            throw new AssertionError((Object)t);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart bodyPart) {
            throw new AssertionError((Object)"Should not have received body part");
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus responseStatus) {
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpHeaders headers) {
            return AsyncHandler.State.CONTINUE;
        }

        public Void onCompleted() {
            return null;
        }
    }
}

