/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.http;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Deployable;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.PoolOptions;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.test.core.VertxTestBase;
import io.vertx.tests.http.HttpTest;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Test;

public class SharedHttpClientTest
extends VertxTestBase {
    private static final int SHARED_POOL_SIZE = 7;
    private static final int CLIENT_VERTICLE_INSTANCES = 8;
    private static final int NUM_REQUESTS_PER_VERTICLE = 350;
    private static final int TOTAL_REQUESTS = 2800;

    @Test
    public void testVerticlesUseSamePool() throws Exception {
        CountDownLatch receivedLatch = new CountDownLatch(2800);
        ServerVerticle serverVerticle = new ServerVerticle();
        this.vertx.deployVerticle((Deployable)serverVerticle).onComplete(this.onSuccess(serverId -> {
            DeploymentOptions deploymentOptions = SharedHttpClientTest.deploymentOptions(8, new HttpClientOptions().setDefaultPort(HttpTest.DEFAULT_HTTP_PORT), new PoolOptions().setHttp1MaxSize(7));
            Supplier<Verticle> verticleSupplier = () -> new ClientVerticle(clientVerticle -> {
                this.assertEquals(clientVerticle.context.deploymentID(), Vertx.currentContext().deploymentID());
                receivedLatch.countDown();
            });
            this.vertx.deployVerticle(verticleSupplier, deploymentOptions).onComplete(this.onSuccess(clientId -> this.vertx.eventBus().publish(ClientVerticle.TRIGGER_ADDRESS, (Object)350)));
        }));
        SharedHttpClientTest.waitUntil(() -> serverVerticle.connections.size() == 7);
        serverVerticle.replyLatch.complete();
        this.awaitLatch(receivedLatch);
        this.assertEquals(serverVerticle.maxConnections, 7L);
    }

    @Test
    public void testSharedPoolClosedAutomatically() throws Exception {
        CountDownLatch receivedLatch = new CountDownLatch(2800);
        ServerVerticle serverVerticle = new ServerVerticle();
        AtomicReference clientDeploymentId = new AtomicReference();
        this.vertx.deployVerticle((Deployable)serverVerticle).onComplete(this.onSuccess(serverId -> {
            HttpClientOptions clientOptions = new HttpClientOptions().setDefaultPort(HttpTest.DEFAULT_HTTP_PORT).setKeepAliveTimeout(3600);
            DeploymentOptions deploymentOptions = SharedHttpClientTest.deploymentOptions(8, clientOptions, new PoolOptions().setHttp1MaxSize(7));
            Supplier<Verticle> verticleSupplier = () -> new ClientVerticle(clientVerticle -> receivedLatch.countDown());
            this.vertx.deployVerticle(verticleSupplier, deploymentOptions).onComplete(this.onSuccess(clientId -> {
                clientDeploymentId.set(clientId);
                this.vertx.eventBus().publish(ClientVerticle.TRIGGER_ADDRESS, (Object)350);
            }));
        }));
        SharedHttpClientTest.waitUntil(() -> serverVerticle.connections.size() == 7);
        serverVerticle.replyLatch.complete();
        this.awaitLatch(receivedLatch);
        CountDownLatch undeployLatch = new CountDownLatch(1);
        this.vertx.undeploy((String)clientDeploymentId.get()).onComplete(this.onSuccess(v -> undeployLatch.countDown()));
        this.awaitLatch(undeployLatch);
        SharedHttpClientTest.assertWaitUntil(() -> serverVerticle.connections.size() == 0);
    }

    @Test
    public void testSharedPoolRetainedByOtherDeployment() throws Exception {
        int keepAliveTimeoutSeconds = 3;
        CountDownLatch receivedLatch = new CountDownLatch(2800);
        ServerVerticle serverVerticle = new ServerVerticle();
        AtomicReference clientDeploymentId = new AtomicReference();
        this.vertx.deployVerticle((Deployable)serverVerticle).onComplete(this.onSuccess(serverId -> {
            HttpClientOptions clientOptions = new HttpClientOptions().setDefaultPort(HttpTest.DEFAULT_HTTP_PORT).setKeepAliveTimeout(keepAliveTimeoutSeconds);
            DeploymentOptions deploymentOptions = SharedHttpClientTest.deploymentOptions(8, clientOptions, new PoolOptions().setHttp1MaxSize(7));
            Supplier<Verticle> verticleSupplier = () -> new ClientVerticle(clientVerticle -> receivedLatch.countDown());
            this.vertx.deployVerticle(verticleSupplier, deploymentOptions).onComplete(this.onSuccess(clientId -> {
                clientDeploymentId.set(clientId);
                this.vertx.eventBus().publish(ClientVerticle.TRIGGER_ADDRESS, (Object)350);
            }));
        }));
        SharedHttpClientTest.waitUntil(() -> serverVerticle.connections.size() == 7);
        CountDownLatch deployLatch = new CountDownLatch(1);
        this.vertx.deployVerticle((Deployable)new AbstractVerticle(){
            private HttpClient client;

            public void start() {
                this.client = this.vertx.createHttpClient(new HttpClientOptions().setShared(true).setName(ClientVerticle.SHARED_CLIENT_NAME));
            }
        }).onComplete(this.onSuccess(v -> deployLatch.countDown()));
        this.awaitLatch(deployLatch);
        serverVerticle.replyLatch.complete();
        this.awaitLatch(receivedLatch);
        CountDownLatch undeployLatch = new CountDownLatch(1);
        this.vertx.undeploy((String)clientDeploymentId.get()).onComplete(this.onSuccess(v -> undeployLatch.countDown()));
        this.awaitLatch(undeployLatch);
        this.waitFor(2);
        this.vertx.setTimer((long)(1000 * keepAliveTimeoutSeconds / 2), l -> {
            this.assertTrue(serverVerticle.connections.size() > 0);
            this.complete();
        });
        this.vertx.setTimer((long)(2000 * keepAliveTimeoutSeconds), l -> {
            this.assertTrue(serverVerticle.connections.size() == 0);
            this.complete();
        });
        this.await();
    }

    private static DeploymentOptions deploymentOptions(int instances, HttpClientOptions options, PoolOptions poolOptions) {
        return new DeploymentOptions().setInstances(instances).setConfig(new JsonObject().put("httpClientOptions", (Object)options.toJson()).put("poolOptions", (Object)poolOptions.toJson()));
    }

    private static class ServerVerticle
    extends AbstractVerticle
    implements Handler<HttpServerRequest> {
        volatile Promise<Void> replyLatch;
        Set<HttpConnection> connections = Collections.synchronizedSet(new HashSet());
        volatile int maxConnections;

        private ServerVerticle() {
        }

        public void start(Promise<Void> startPromise) throws Exception {
            this.replyLatch = ((VertxInternal)this.vertx).promise();
            this.vertx.createHttpServer().connectionHandler(conn -> {
                this.connections.add((HttpConnection)conn);
                conn.closeHandler(v -> this.connections.remove(conn));
                this.maxConnections = Math.max(this.maxConnections, this.connections.size());
            }).requestHandler((Handler)this).listen(HttpTest.DEFAULT_HTTP_PORT).mapEmpty().onComplete(startPromise);
        }

        public void handle(HttpServerRequest req) {
            this.replyLatch.future().onComplete(ar -> req.response().end());
        }
    }

    private static class ClientVerticle
    extends AbstractVerticle
    implements Handler<Message<Integer>> {
        static final String TRIGGER_ADDRESS = UUID.randomUUID().toString();
        static final String SHARED_CLIENT_NAME = UUID.randomUUID().toString();
        final Consumer<ClientVerticle> onResponseReceived;
        volatile Context context;
        HttpClient client;

        ClientVerticle(Consumer<ClientVerticle> onResponseReceived) {
            this.onResponseReceived = onResponseReceived;
        }

        public void start(Promise<Void> startPromise) throws Exception {
            this.context = ((AbstractVerticle)this).context;
            HttpClientOptions options = new HttpClientOptions(this.config().getJsonObject("httpClientOptions")).setShared(true).setName(SHARED_CLIENT_NAME);
            PoolOptions poolOptions = new PoolOptions(this.config().getJsonObject("poolOptions"));
            this.client = this.vertx.createHttpClient(options, poolOptions);
            this.vertx.eventBus().consumer(TRIGGER_ADDRESS, (Handler)this).completion().onComplete(startPromise);
        }

        public void handle(Message<Integer> message) {
            for (int i = 0; i < (Integer)message.body(); ++i) {
                this.client.request(HttpMethod.GET, "/").compose(HttpClientRequest::send).onComplete(ar -> this.onResponseReceived.accept(this));
            }
        }
    }
}

