/*
 * Decompiled with CFR 0.152.
 */
package com.ning.http.client.async;

import com.ning.http.client.AsyncHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.HttpResponseBodyPart;
import com.ning.http.client.HttpResponseHeaders;
import com.ning.http.client.HttpResponseStatus;
import com.ning.http.client.ListenableFuture;
import com.ning.http.client.async.AbstractBasicTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public abstract class RC10KTest
extends AbstractBasicTest {
    private static final int C10K = 1000;
    private static final String ARG_HEADER = "Arg";
    private static final int SRV_COUNT = 10;
    protected List<Server> servers = new ArrayList<Server>(10);
    private int[] ports;

    @Override
    @BeforeClass(alwaysRun=true)
    public void setUpGlobal() throws Exception {
        this.ports = new int[10];
        for (int i = 0; i < 10; ++i) {
            this.ports[i] = this.createServer();
        }
        this.log.info("Local HTTP servers started successfully");
    }

    @Override
    @AfterClass(alwaysRun=true)
    public void tearDownGlobal() throws Exception {
        for (Server srv : this.servers) {
            srv.stop();
        }
    }

    private int createServer() throws Exception {
        Server srv = new Server();
        ServerConnector listener = new ServerConnector(srv);
        listener.setHost("127.0.0.1");
        int port = this.findFreePort();
        listener.setPort(port);
        srv.addConnector((Connector)listener);
        srv.setHandler((Handler)this.configureHandler());
        srv.start();
        this.servers.add(srv);
        return port;
    }

    @Override
    public AbstractHandler configureHandler() throws Exception {
        return new AbstractHandler(){

            public void handle(String s, Request r, HttpServletRequest req, HttpServletResponse resp) throws IOException, ServletException {
                resp.setContentType("text/pain");
                String arg = s.substring(1);
                resp.setHeader(RC10KTest.ARG_HEADER, arg);
                resp.setStatus(200);
                resp.getOutputStream().print(arg);
                resp.getOutputStream().flush();
                resp.getOutputStream().close();
            }
        };
    }

    @Test(timeOut=600000L, groups={"scalability"})
    public void rc10kProblem() throws IOException, ExecutionException, TimeoutException, InterruptedException {
        try (AsyncHttpClient client = this.getAsyncHttpClient(new AsyncHttpClientConfig.Builder().setMaxConnectionsPerHost(1000).setAllowPoolingConnections(true).build());){
            ArrayList<ListenableFuture> resps = new ArrayList<ListenableFuture>(1000);
            int i = 0;
            while (i < 1000) {
                resps.add(client.prepareGet(String.format("http://127.0.0.1:%d/%d", this.ports[i % 10], i)).execute((AsyncHandler)new MyAsyncHandler(i++)));
            }
            i = 0;
            for (Future future : resps) {
                Integer resp = (Integer)future.get();
                Assert.assertNotNull((Object)resp);
                Assert.assertEquals((int)resp, (int)i++);
            }
        }
    }

    private class MyAsyncHandler
    implements AsyncHandler<Integer> {
        private String arg;
        private AtomicInteger result = new AtomicInteger(-1);

        public MyAsyncHandler(int i) {
            this.arg = String.format("%d", i);
        }

        public void onThrowable(Throwable t) {
            RC10KTest.this.log.warn("onThrowable called.", t);
        }

        public AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart event) throws Exception {
            String s = new String(event.getBodyPartBytes());
            this.result.compareAndSet(-1, new Integer(s.trim().isEmpty() ? "-1" : s));
            return AsyncHandler.STATE.CONTINUE;
        }

        public AsyncHandler.STATE onStatusReceived(HttpResponseStatus event) throws Exception {
            Assert.assertEquals((int)event.getStatusCode(), (int)200);
            return AsyncHandler.STATE.CONTINUE;
        }

        public AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders event) throws Exception {
            Assert.assertEquals((String)event.getHeaders().getJoinedValue(RC10KTest.ARG_HEADER, ", "), (String)this.arg);
            return AsyncHandler.STATE.CONTINUE;
        }

        public Integer onCompleted() throws Exception {
            return this.result.get();
        }
    }
}

