/*
 * Decompiled with CFR 0.152.
 */
package karate.com.linecorp.armeria.client.limit;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import karate.com.linecorp.armeria.client.Client;
import karate.com.linecorp.armeria.client.ClientRequestContext;
import karate.com.linecorp.armeria.client.SimpleDecoratingClient;
import karate.com.linecorp.armeria.client.UnprocessedRequestException;
import karate.com.linecorp.armeria.client.limit.ConcurrencyLimit;
import karate.com.linecorp.armeria.common.Request;
import karate.com.linecorp.armeria.common.Response;
import karate.com.linecorp.armeria.common.util.SafeCloseable;

public abstract class AbstractConcurrencyLimitingClient<I extends Request, O extends Response>
extends SimpleDecoratingClient<I, O> {
    private final ConcurrencyLimit concurrencyLimit;
    private final AtomicInteger numActiveRequests = new AtomicInteger();

    protected AbstractConcurrencyLimitingClient(Client<I, O> delegate, int maxConcurrency) {
        this(delegate, ConcurrencyLimit.builder(maxConcurrency).build());
    }

    protected AbstractConcurrencyLimitingClient(Client<I, O> delegate, int maxConcurrency, long timeout, TimeUnit unit) {
        this(delegate, ConcurrencyLimit.builder(maxConcurrency).timeoutMillis(unit.toMillis(timeout)).build());
    }

    protected AbstractConcurrencyLimitingClient(Client<I, O> delegate, ConcurrencyLimit concurrencyLimit) {
        super(delegate);
        this.concurrencyLimit = concurrencyLimit;
    }

    public final int numActiveRequests() {
        return this.numActiveRequests.get();
    }

    @Override
    public final O execute(ClientRequestContext ctx, I req) throws Exception {
        CompletableFuture resFuture = new CompletableFuture();
        Object deferred = this.newDeferredResponse(ctx, resFuture);
        this.concurrencyLimit.acquire(ctx).handleAsync((permit, throwable) -> {
            if (throwable != null) {
                UnprocessedRequestException t = UnprocessedRequestException.of(throwable);
                AbstractConcurrencyLimitingClient.completeExceptionally(ctx, resFuture, t);
                return null;
            }
            this.numActiveRequests.incrementAndGet();
            try (SafeCloseable ignored = ctx.replace();){
                try {
                    Object actualRes = ((Client)this.unwrap()).execute(ctx, req);
                    actualRes.whenComplete().handle((unused, cause) -> {
                        permit.close();
                        this.numActiveRequests.decrementAndGet();
                        return null;
                    });
                    resFuture.complete(actualRes);
                }
                catch (Throwable t) {
                    permit.close();
                    this.numActiveRequests.decrementAndGet();
                    AbstractConcurrencyLimitingClient.completeExceptionally(ctx, resFuture, t);
                }
            }
            return null;
        }, (Executor)ctx.eventLoop().withoutContext());
        return (O)deferred;
    }

    private static void completeExceptionally(ClientRequestContext ctx, CompletableFuture<?> resFuture, Throwable t) {
        resFuture.completeExceptionally(t);
        ctx.logBuilder().endRequest(t);
        ctx.logBuilder().endResponse(t);
    }

    protected abstract O newDeferredResponse(ClientRequestContext var1, CompletionStage<O> var2) throws Exception;
}

