/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.directconnectivity.rntbd.ChannelPromiseWithExpiryTime;
import com.azure.cosmos.implementation.directconnectivity.rntbd.OpenChannelPromise;
import com.azure.cosmos.implementation.directconnectivity.rntbd.OpenConnectionRntbdRequestRecord;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionEvent;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionEventType;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelState;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdClientChannelHandler;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdClientChannelHealthChecker;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConnectionStateListener;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdDurableEndpointMetrics;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdHealthCheckRequest;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdPollChannelEvent;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestRecord;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdThreadFactory;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUtils;
import com.azure.cosmos.implementation.faultinjection.RntbdFaultInjectionConnectionCloseEvent;
import com.azure.cosmos.implementation.faultinjection.RntbdFaultInjectionConnectionResetEvent;
import com.azure.cosmos.implementation.faultinjection.RntbdServerErrorInjector;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.guava27.Strings;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.util.AttributeKey;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.ThrowableUtil;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonSerialize(using=JsonSerializer.class)
public final class RntbdClientChannelPool
implements ChannelPool {
    private static final TimeoutException ACQUISITION_TIMEOUT = (TimeoutException)ThrowableUtil.unknownStackTrace((Throwable)new TimeoutException("acquisition took longer than the configured maximum time"), RntbdClientChannelPool.class, (String)"<init>");
    private static final ClosedChannelException CHANNEL_CLOSED_ON_ACQUIRE = (ClosedChannelException)ThrowableUtil.unknownStackTrace((Throwable)new ClosedChannelException(), RntbdClientChannelPool.class, (String)"acquire");
    private static final IllegalStateException POOL_CLOSED_ON_ACQUIRE = (IllegalStateException)ThrowableUtil.unknownStackTrace((Throwable)new ChannelAcquisitionException("service endpoint was closed while acquiring a channel"), RntbdClientChannelPool.class, (String)"acquire");
    private static final IllegalStateException POOL_CLOSED_ON_RELEASE = (IllegalStateException)ThrowableUtil.unknownStackTrace((Throwable)new ChannelAcquisitionException("service endpoint was closed while releasing a channel"), RntbdClientChannelPool.class, (String)"release");
    private static final AttributeKey<RntbdClientChannelPool> POOL_KEY = AttributeKey.newInstance((String)RntbdClientChannelPool.class.getName());
    private static final IllegalStateException TOO_MANY_PENDING_ACQUISITIONS = (IllegalStateException)ThrowableUtil.unknownStackTrace((Throwable)new ChannelAcquisitionException("too many outstanding acquire operations"), RntbdClientChannelPool.class, (String)"acquire");
    private static final EventExecutor closer = new DefaultEventExecutor((ThreadFactory)new RntbdThreadFactory("channel-pool-closer", true, 5));
    private static final EventExecutor pendingAcquisitionExpirationExecutor = new DefaultEventExecutor((ThreadFactory)new RntbdThreadFactory("pending-acquisition-expirator", true, 5));
    private static final HashedWheelTimer acquisitionAndIdleEndpointDetectionTimer = new HashedWheelTimer((ThreadFactory)new RntbdThreadFactory("channel-acquisition-timer", true, 5));
    private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelPool.class);
    private final long acquisitionTimeoutInNanos;
    private final int connectTimeoutInMillis;
    private final Runnable acquisitionTimeoutTask;
    private final PooledByteBufAllocatorMetric allocatorMetric;
    private final Bootstrap bootstrap;
    private final EventExecutor executor;
    private final ChannelHealthChecker healthChecker;
    private final int maxChannels;
    private final int maxPendingAcquisitions;
    private final int maxRequestsPerChannel;
    private final ChannelPoolHandler poolHandler;
    private final boolean releaseHealthCheck;
    private final RntbdDurableEndpointMetrics durableEndpointMetrics;
    private final AtomicReference<Timeout> acquisitionAndIdleEndpointDetectionTimeout = new AtomicReference();
    private final ConcurrentHashMap<Channel, Channel> acquiredChannels = new ConcurrentHashMap();
    private final Deque<Channel> availableChannels = new ArrayDeque<Channel>();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AtomicBoolean connecting = new AtomicBoolean();
    private final Queue<AcquireListener> pendingAcquisitions = new PriorityBlockingQueue<AcquireListener>(100, Comparator.comparingLong(task -> ((AcquireListener)task).originalPromise.getExpiryTimeInNanos()));
    private final ScheduledFuture<?> pendingAcquisitionExpirationFuture;
    private final ClientTelemetry clientTelemetry;
    private final RntbdServerErrorInjector serverErrorInjector;
    private final RntbdServiceEndpoint endpoint;
    private final RntbdConnectionStateListener connectionStateListener;

    RntbdClientChannelPool(RntbdServiceEndpoint endpoint, Bootstrap bootstrap, RntbdEndpoint.Config config, ClientTelemetry clientTelemetry, RntbdConnectionStateListener connectionStateListener, RntbdServerErrorInjector faultInjectionInterceptors, RntbdDurableEndpointMetrics durableEndpointMetrics) {
        this(endpoint, bootstrap, config, new RntbdClientChannelHealthChecker(config), clientTelemetry, connectionStateListener, faultInjectionInterceptors, durableEndpointMetrics);
    }

    private RntbdClientChannelPool(RntbdServiceEndpoint endpoint, Bootstrap bootstrap, RntbdEndpoint.Config config, RntbdClientChannelHealthChecker healthChecker, final ClientTelemetry clientTelemetry, RntbdConnectionStateListener connectionStateListener, RntbdServerErrorInjector serverErrorInjector, RntbdDurableEndpointMetrics durableEndpointMetrics) {
        Preconditions.checkNotNull(endpoint, "expected non-null endpoint");
        Preconditions.checkNotNull(bootstrap, "expected non-null bootstrap");
        Preconditions.checkNotNull(config, "expected non-null config");
        Preconditions.checkNotNull(healthChecker, "expected non-null healthChecker");
        Preconditions.checkNotNull(durableEndpointMetrics, "expected non-null durableEndpointMetrics");
        this.poolHandler = new RntbdClientChannelHandler(config, healthChecker, connectionStateListener, serverErrorInjector);
        this.executor = bootstrap.config().group().next();
        this.healthChecker = healthChecker;
        this.serverErrorInjector = serverErrorInjector;
        this.durableEndpointMetrics = durableEndpointMetrics;
        this.endpoint = endpoint;
        this.connectionStateListener = connectionStateListener;
        this.bootstrap = (Bootstrap)bootstrap.clone().handler((ChannelHandler)new ChannelInitializer<Channel>(){

            protected void initChannel(Channel channel) throws Exception {
                Preconditions.checkState(channel.eventLoop().inEventLoop());
                RntbdClientChannelPool.this.poolHandler.channelCreated(channel);
            }
        });
        this.acquisitionTimeoutInNanos = config.connectionAcquisitionTimeoutInNanos();
        this.connectTimeoutInMillis = config.connectTimeoutInMillis();
        this.allocatorMetric = config.allocator().metric();
        this.maxChannels = config.maxChannelsPerEndpoint();
        this.maxRequestsPerChannel = config.maxRequestsPerChannel();
        this.maxPendingAcquisitions = Integer.MAX_VALUE;
        this.releaseHealthCheck = true;
        this.acquisitionTimeoutTask = this.acquisitionTimeoutInNanos <= 0L ? null : new AcquireTimeoutTask(this){

            @Override
            public void onTimeout(AcquireListener task) {
                task.originalPromise.setFailure(ACQUISITION_TIMEOUT);
                RntbdChannelAcquisitionTimeline.startNewEvent(task.originalPromise.getChannelAcquisitionTimeline(), RntbdChannelAcquisitionEventType.PENDING_TIME_OUT, clientTelemetry);
            }
        };
        this.newTimeout(endpoint, config.idleEndpointTimeoutInNanos(), config.requestTimerResolutionInNanos());
        this.pendingAcquisitionExpirationFuture = this.acquisitionTimeoutTask != null ? pendingAcquisitionExpirationExecutor.scheduleAtFixedRate(this.acquisitionTimeoutTask, this.acquisitionTimeoutInNanos, this.acquisitionTimeoutInNanos, TimeUnit.NANOSECONDS) : null;
        this.clientTelemetry = clientTelemetry;
    }

    public int channels(boolean approximationAcceptable) {
        if (!approximationAcceptable) {
            this.ensureInEventLoop();
        }
        return this.acquiredChannels.size() + this.availableChannels.size() + (this.connecting.get() ? 1 : 0);
    }

    public int channelsAcquiredMetrics() {
        return this.acquiredChannels.size();
    }

    public int channelsAvailableMetrics() {
        return this.availableChannels.size();
    }

    public int attemptingToConnectMetrics() {
        return this.connecting.get() ? 1 : 0;
    }

    public int executorTaskQueueMetrics() {
        return RntbdUtils.tryGetExecutorTaskQueueSize(this.executor);
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public int maxChannels() {
        return this.maxChannels;
    }

    public int maxRequestsPerChannel() {
        return this.maxRequestsPerChannel;
    }

    public SocketAddress remoteAddress() {
        return this.bootstrap.config().remoteAddress();
    }

    public int requestQueueLength() {
        return this.pendingAcquisitions.size();
    }

    public long usedDirectMemory() {
        return this.allocatorMetric.usedDirectMemory();
    }

    public long usedHeapMemory() {
        return this.allocatorMetric.usedHeapMemory();
    }

    public Future<Channel> acquire() {
        return this.acquire(new ChannelPromiseWithExpiryTime(this.getNewChannelPromise(), this.getNewPromiseExpiryTime()));
    }

    public Future<Channel> acquire(RntbdRequestRecord requestRecord) {
        Preconditions.checkNotNull(requestRecord, "Argument 'requestRecord' should not be null");
        return this.acquire(new ChannelPromiseWithExpiryTime(this.getNewChannelPromise(), this.getNewPromiseExpiryTime(), requestRecord));
    }

    public Future<Channel> acquire(OpenConnectionRntbdRequestRecord requestRecord) {
        Preconditions.checkNotNull(requestRecord, "Argument 'requestRecord' should not be null");
        OpenChannelPromise openChannelPromise = new OpenChannelPromise(this.getNewChannelPromise(), this.getNewPromiseExpiryTime(), requestRecord);
        try {
            if (this.executor.inEventLoop()) {
                this.acquireChannel(openChannelPromise);
            } else {
                this.executor.execute(() -> this.acquireChannel(openChannelPromise));
            }
        }
        catch (Throwable cause) {
            openChannelPromise.setFailure(cause);
        }
        return openChannelPromise;
    }

    private long getNewPromiseExpiryTime() {
        return System.nanoTime() + this.acquisitionTimeoutInNanos;
    }

    private Promise<Channel> getNewChannelPromise() {
        return this.bootstrap.config().group().next().newPromise();
    }

    public Future<Channel> acquire(Promise<Channel> promise) {
        this.throwIfClosed();
        ChannelPromiseWithExpiryTime promiseWithExpiryTime = promise instanceof ChannelPromiseWithExpiryTime ? (ChannelPromiseWithExpiryTime)promise : new ChannelPromiseWithExpiryTime(promise, System.nanoTime() + this.acquisitionTimeoutInNanos);
        try {
            if (this.executor.inEventLoop()) {
                this.acquireChannel(promiseWithExpiryTime);
            } else if (this.pendingAcquisitions.size() > 1000) {
                this.addTaskToPendingAcquisitionQueue(promiseWithExpiryTime);
            } else {
                this.executor.execute(() -> this.acquireChannel(promiseWithExpiryTime));
            }
        }
        catch (Throwable cause) {
            promiseWithExpiryTime.setFailure(cause);
        }
        return promiseWithExpiryTime;
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (this.executor.inEventLoop()) {
                this.doClose();
            } else {
                this.executor.submit(this::doClose).awaitUninterruptibly();
            }
        }
        this.pendingAcquisitionExpirationFuture.cancel(false);
    }

    public Future<Void> release(Channel channel) {
        return this.release(channel, (Promise<Void>)channel.eventLoop().newPromise());
    }

    public Future<Void> release(Channel channel, Promise<Void> promise) {
        Preconditions.checkNotNull(channel, "expected non-null channel");
        Preconditions.checkNotNull(promise, "expected non-null promise");
        Promise anotherPromise = this.executor.newPromise();
        try {
            EventLoop loop = channel.eventLoop();
            if (loop.inEventLoop()) {
                this.releaseChannel(channel, (Promise<Void>)anotherPromise);
            } else {
                loop.execute(() -> this.releaseChannel(channel, (Promise<Void>)anotherPromise));
            }
        }
        catch (Throwable cause) {
            if (this.executor.inEventLoop()) {
                this.closeChannelAndFail(channel, cause, anotherPromise);
            }
            this.executor.submit(() -> this.closeChannelAndFail(channel, cause, anotherPromise));
        }
        anotherPromise.addListener((GenericFutureListener)((FutureListener)future -> {
            this.ensureInEventLoop();
            if (this.isClosed()) {
                promise.setFailure((Throwable)POOL_CLOSED_ON_RELEASE);
                this.closeChannel(channel);
                return;
            }
            if (future.isSuccess()) {
                this.runTasksInPendingAcquisitionQueue();
                promise.setSuccess(null);
            } else {
                Throwable cause = future.cause();
                if (!(cause instanceof IllegalArgumentException)) {
                    this.runTasksInPendingAcquisitionQueue();
                }
                promise.setFailure(cause);
            }
        }));
        return promise;
    }

    public String toString() {
        return RntbdObjectMapper.toString(this);
    }

    private void acquireChannel(ChannelPromiseWithExpiryTime promise) {
        this.ensureInEventLoop();
        RntbdReporter.reportIssueUnless(logger, promise != null, this, "Channel promise should not be null", new Object[0]);
        RntbdChannelAcquisitionTimeline channelAcquisitionTimeline = promise.getChannelAcquisitionTimeline();
        if (this.isClosed()) {
            promise.setFailure(POOL_CLOSED_ON_ACQUIRE);
            return;
        }
        try {
            Channel candidate = null;
            if (!(promise instanceof OpenChannelPromise && this.endpoint.getMinChannelsRequired() > this.channels(false) || (candidate = this.pollChannel(channelAcquisitionTimeline)) == null)) {
                this.doAcquireChannel(promise, candidate);
                return;
            }
            if (this.allowedToOpenNewChannel(this.maxChannels)) {
                if (this.connecting.compareAndSet(false, true)) {
                    ChannelFuture future;
                    ChannelPromiseWithExpiryTime anotherPromise = this.newChannelPromiseForToBeEstablishedChannel(promise);
                    RntbdChannelAcquisitionTimeline.startNewEvent(channelAcquisitionTimeline, RntbdChannelAcquisitionEventType.ATTEMPT_TO_CREATE_NEW_CHANNEL, this.clientTelemetry);
                    if (this.serverErrorInjector != null) {
                        Consumer<Duration> openConnectionConsumer = delay -> this.openNewChannelWithInjectedDelay(anotherPromise, (Duration)delay);
                        if (this.serverErrorInjector.injectRntbdServerConnectionDelay(promise.getRntbdRequestRecord(), openConnectionConsumer)) {
                            return;
                        }
                    }
                    if ((future = ((Bootstrap)this.bootstrap.clone().attr(POOL_KEY, (Object)this)).connect()).isDone()) {
                        this.safeNotifyChannelConnect(future, anotherPromise);
                    } else {
                        future.addListener(ignored -> this.safeNotifyChannelConnect(future, anotherPromise));
                    }
                    return;
                }
            } else if (this.computeLoadFactor() > 0.9) {
                long pendingRequestCountMin = Long.MAX_VALUE;
                for (Channel channel : this.availableChannels) {
                    RntbdRequestManager manager = (RntbdRequestManager)channel.pipeline().get(RntbdRequestManager.class);
                    if (manager == null) {
                        if (!logger.isDebugEnabled()) continue;
                        logger.debug("Channel({} --> {}) closed", (Object)channel, (Object)this.remoteAddress());
                        continue;
                    }
                    long pendingRequestCount = manager.pendingRequestCount();
                    if (pendingRequestCount >= pendingRequestCountMin) continue;
                    RntbdChannelState channelState = this.getChannelState(channel);
                    RntbdChannelAcquisitionTimeline.addDetailsToLastEvent(channelAcquisitionTimeline, channelState);
                    if (!channelState.isOk()) continue;
                    pendingRequestCountMin = pendingRequestCount;
                    candidate = channel;
                }
                if (candidate != null && this.availableChannels.remove(candidate)) {
                    this.doAcquireChannel(promise, candidate);
                    return;
                }
            } else {
                for (Channel channel : this.availableChannels) {
                    RntbdChannelState channelState = this.getChannelState(channel);
                    RntbdChannelAcquisitionTimeline.addDetailsToLastEvent(channelAcquisitionTimeline, channelState);
                    if (!channelState.isOk() || !this.availableChannels.remove(channel)) continue;
                    this.doAcquireChannel(promise, channel);
                    return;
                }
            }
            this.addTaskToPendingAcquisitionQueue(promise);
        }
        catch (Throwable cause) {
            promise.tryFailure(cause);
        }
    }

    private boolean allowedToOpenNewChannel(int channelLimit) {
        int channelCount = this.channels(false);
        return channelCount < channelLimit;
    }

    private void openNewChannelWithInjectedDelay(Promise<Channel> promise, Duration latencyDuration) {
        this.ensureInEventLoop();
        long delayInMillis = Math.min((long)this.connectTimeoutInMillis, latencyDuration.toMillis());
        final long effectiveConnectTimeoutInMillis = Math.max((long)this.connectTimeoutInMillis - delayInMillis, 10L);
        this.executor.schedule(() -> {
            ChannelFuture channelFuture = ((Bootstrap)this.bootstrap.clone().handler((ChannelHandler)new ChannelInitializer<Channel>(){

                protected void initChannel(Channel channel) throws Exception {
                    Preconditions.checkState(channel.eventLoop().inEventLoop());
                    channel.config().setOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)effectiveConnectTimeoutInMillis));
                    RntbdClientChannelPool.this.poolHandler.channelCreated(channel);
                }
            })).connect();
            if (channelFuture.isDone()) {
                this.safeNotifyChannelConnect(channelFuture, promise);
            } else {
                channelFuture.addListener(ignored -> this.safeNotifyChannelConnect(channelFuture, promise));
            }
        }, delayInMillis, TimeUnit.MILLISECONDS);
    }

    private void addTaskToPendingAcquisitionQueue(ChannelPromiseWithExpiryTime promise) {
        if (logger.isDebugEnabled()) {
            logger.debug("{}, {}, {}, {}, {}, {}", new Object[]{Instant.now(), this.remoteAddress(), this.channels(true), this.channelsAcquiredMetrics(), this.channelsAvailableMetrics(), this.requestQueueLength()});
        }
        if (this.pendingAcquisitions.size() >= this.maxPendingAcquisitions) {
            promise.setFailure(TOO_MANY_PENDING_ACQUISITIONS);
        } else {
            AcquireListener acquireTask = new AcquireListener(this, promise);
            if (!this.pendingAcquisitions.offer(acquireTask)) {
                promise.setFailure(TOO_MANY_PENDING_ACQUISITIONS);
            } else {
                RntbdChannelAcquisitionTimeline.startNewEvent(promise.getChannelAcquisitionTimeline(), RntbdChannelAcquisitionEventType.ADD_TO_PENDING_QUEUE, this.clientTelemetry);
            }
        }
    }

    private void closeChannel(Channel channel) {
        this.ensureInEventLoop();
        this.durableEndpointMetrics.incrementClosedChannels();
        this.acquiredChannels.remove(channel);
        this.availableChannels.remove(channel);
        channel.attr(POOL_KEY).set(null);
        channel.close().addListener(future -> {
            if (future.isDone() && !this.isClosed()) {
                this.connectionStateListener.openConnectionIfNeeded();
            }
        });
    }

    private void closeChannelAndFail(Channel channel, Throwable cause, Promise<?> promise) {
        this.ensureInEventLoop();
        this.closeChannel(channel);
        promise.tryFailure(cause);
    }

    private double computeLoadFactor() {
        long pendingRequestCount;
        RntbdRequestManager manager;
        this.ensureInEventLoop();
        long pendingRequestCountMin = Long.MAX_VALUE;
        long pendingRequestCountTotal = 0L;
        long channelCount = 0L;
        for (Channel channel : this.availableChannels) {
            manager = (RntbdRequestManager)channel.pipeline().get(RntbdRequestManager.class);
            if (manager == null) {
                logger.debug("Channel({}) connection lost", (Object)channel);
                continue;
            }
            pendingRequestCount = manager.pendingRequestCount();
            if (pendingRequestCount < pendingRequestCountMin) {
                pendingRequestCountMin = pendingRequestCount;
            }
            pendingRequestCountTotal += pendingRequestCount;
            ++channelCount;
        }
        for (Channel channel : this.acquiredChannels.values()) {
            manager = (RntbdRequestManager)channel.pipeline().get(RntbdRequestManager.class);
            if (manager != null) {
                pendingRequestCount = manager.pendingRequestCount();
                if (pendingRequestCount < pendingRequestCountMin) {
                    pendingRequestCountMin = pendingRequestCount;
                }
                pendingRequestCountTotal += pendingRequestCount;
            }
            ++channelCount;
        }
        return channelCount > 0L ? (double)pendingRequestCountTotal / (double)(channelCount * (long)this.maxRequestsPerChannel) : 1.0;
    }

    private void doAcquireChannel(ChannelPromiseWithExpiryTime promise, Channel candidate) {
        this.ensureInEventLoop();
        this.acquiredChannels.put(candidate, candidate);
        ChannelPromiseWithExpiryTime anotherPromise = this.newChannelPromiseForAvailableChannel(promise, candidate);
        EventLoop loop = candidate.eventLoop();
        if (loop.inEventLoop()) {
            this.doChannelHealthCheck(candidate, anotherPromise);
        } else {
            loop.execute(() -> this.doChannelHealthCheck(candidate, anotherPromise));
        }
    }

    private void doChannelHealthCheck(Channel channel, ChannelPromiseWithExpiryTime promise) {
        Preconditions.checkState(channel.eventLoop().inEventLoop());
        Future isHealthy = this.healthChecker.isHealthy(channel);
        if (isHealthy.isDone()) {
            this.notifyChannelHealthCheck((Future<Boolean>)isHealthy, channel, promise);
        } else {
            isHealthy.addListener((GenericFutureListener)((FutureListener)future -> this.notifyChannelHealthCheck((Future<Boolean>)future, channel, promise)));
        }
    }

    private void doChannelHealthCheckOnRelease(Channel channel, Promise<Void> promise) {
        try {
            Preconditions.checkState(channel.eventLoop().inEventLoop());
            Future future = this.healthChecker.isHealthy(channel);
            if (future.isDone()) {
                this.releaseAndOfferChannelIfHealthy(channel, promise, (Future<Boolean>)future);
            } else {
                future.addListener(ignored -> this.releaseAndOfferChannelIfHealthy(channel, promise, (Future<Boolean>)future));
            }
        }
        catch (Throwable error) {
            if (this.executor.inEventLoop()) {
                this.closeChannelAndFail(channel, error, promise);
            }
            this.executor.submit(() -> this.closeChannelAndFail(channel, error, promise));
        }
    }

    private void doClose() {
        AcquireListener task;
        this.ensureInEventLoop();
        this.acquisitionAndIdleEndpointDetectionTimeout.getAndUpdate(timeout -> {
            timeout.cancel();
            return null;
        });
        if (logger.isDebugEnabled()) {
            logger.debug("{} closing with {} pending channel acquisitions", (Object)this, (Object)this.requestQueueLength());
        }
        while ((task = this.pendingAcquisitions.poll()) != null) {
            task.originalPromise.setFailure(new ClosedChannelException());
        }
        this.executor.submit(() -> {
            Channel channel;
            this.ensureInEventLoop();
            this.availableChannels.addAll(this.acquiredChannels.values());
            this.acquiredChannels.clear();
            ArrayList<Channel> channelList = new ArrayList<Channel>();
            while ((channel = this.pollChannel(null)) != null) {
                channelList.add(channel);
            }
            assert (this.acquiredChannels.isEmpty() && this.availableChannels.isEmpty());
            closer.submit(() -> {
                for (Channel channel : channelList) {
                    channel.close().awaitUninterruptibly();
                }
            });
        }).addListener(closed -> {
            if (!closed.isSuccess()) {
                logger.error("[{}] close failed due to ", (Object)this, (Object)closed.cause());
            } else {
                logger.debug("[{}] closed", (Object)this);
            }
        });
    }

    private void ensureInEventLoop() {
        RntbdReporter.reportIssueUnless(logger, this.executor.inEventLoop(), this, "expected to be in event loop {}, not thread {}", this.executor, Thread.currentThread());
    }

    private ChannelPromiseWithExpiryTime newChannelPromiseForAvailableChannel(ChannelPromiseWithExpiryTime promise, Channel candidate) {
        return this.createNewChannelPromise(promise, (EventExecutor)candidate.eventLoop());
    }

    private ChannelPromiseWithExpiryTime newChannelPromiseForToBeEstablishedChannel(ChannelPromiseWithExpiryTime promise) {
        return this.createNewChannelPromise(promise, this.executor);
    }

    private ChannelPromiseWithExpiryTime createNewChannelPromise(ChannelPromiseWithExpiryTime promise, EventExecutor eventLoop) {
        Preconditions.checkNotNull(promise, "expected non-null promise");
        AcquireListener listener = new AcquireListener(this, promise);
        Promise anotherPromise = eventLoop.newPromise();
        listener.acquired();
        anotherPromise.addListener((GenericFutureListener)listener);
        return new ChannelPromiseWithExpiryTime((Promise<Channel>)anotherPromise, promise.getExpiryTimeInNanos(), promise.getRntbdRequestRecord());
    }

    private void newTimeout(RntbdServiceEndpoint endpoint, long idleEndpointTimeoutInNanos, long requestTimerResolutionInNanos) {
        this.acquisitionAndIdleEndpointDetectionTimeout.set(acquisitionAndIdleEndpointDetectionTimer.newTimeout(timeout -> {
            if (idleEndpointTimeoutInNanos == 0L) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Idle endpoint check is disabled");
                }
            } else {
                long elapsedTimeInNanos = System.nanoTime() - endpoint.lastRequestNanoTime();
                if (idleEndpointTimeoutInNanos - elapsedTimeInNanos <= 0L) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("{} closing endpoint due to inactivity (elapsedTime: {} > idleEndpointTimeout: {})", new Object[]{endpoint, Duration.ofNanos(elapsedTimeInNanos), Duration.ofNanos(idleEndpointTimeoutInNanos)});
                    }
                    endpoint.close();
                    return;
                }
            }
            if (this.requestQueueLength() <= 0) {
                this.newTimeout(endpoint, idleEndpointTimeoutInNanos, requestTimerResolutionInNanos);
                return;
            }
            this.executor.submit(this::runTasksInPendingAcquisitionQueue).addListener(future -> {
                RntbdReporter.reportIssueUnless(logger, future.isSuccess(), this, "failed due to ", future.cause());
                this.newTimeout(endpoint, idleEndpointTimeoutInNanos, requestTimerResolutionInNanos);
            });
        }, requestTimerResolutionInNanos, TimeUnit.NANOSECONDS));
    }

    private void safeNotifyChannelConnect(ChannelFuture future, Promise<Channel> promise) {
        if (this.executor.inEventLoop()) {
            this.notifyChannelConnect(future, promise);
        } else {
            this.executor.submit(() -> this.notifyChannelConnect(future, promise));
        }
    }

    private void safeCloseChannel(Channel channel) {
        if (this.executor.inEventLoop()) {
            this.closeChannel(channel);
        } else {
            this.executor.submit(() -> this.closeChannel(channel));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyChannelConnect(ChannelFuture future, Promise<Channel> promise) {
        block14: {
            this.ensureInEventLoop();
            RntbdReporter.reportIssueUnless(logger, this.connecting.get(), this, "connecting: false", new Object[0]);
            try {
                if (future.isSuccess()) {
                    Channel channel = future.channel();
                    channel.closeFuture().addListener(f -> {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Channel to endpoint {} is closed. isOnChannelEventLoop={}, isActive={}, isOpen={}, isRegistered={}, isWritable={}, threadName={}", new Object[]{channel.remoteAddress(), channel.eventLoop().inEventLoop(), channel.isActive(), channel.isOpen(), channel.isRegistered(), channel.isWritable(), Thread.currentThread().getName()});
                        }
                        this.safeCloseChannel(channel);
                    });
                    try {
                        this.poolHandler.channelAcquired(channel);
                    }
                    catch (Throwable error) {
                        this.closeChannelAndFail(channel, error, promise);
                        if (promise instanceof ChannelPromiseWithExpiryTime) {
                            RntbdChannelAcquisitionTimeline.startNewEvent(((ChannelPromiseWithExpiryTime)promise).getChannelAcquisitionTimeline(), RntbdChannelAcquisitionEventType.ATTEMPT_TO_CREATE_NEW_CHANNEL_COMPLETE, this.clientTelemetry);
                        }
                        this.connecting.set(false);
                        return;
                    }
                    if (promise.trySuccess((Object)channel)) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("established a channel local {}, remote {}", (Object)channel.localAddress(), (Object)channel.remoteAddress());
                        }
                        this.durableEndpointMetrics.incrementAcquiredChannels();
                        this.acquiredChannels.compute(channel, (ignored, acquiredChannel) -> {
                            RntbdReporter.reportIssueUnless(logger, acquiredChannel == null, this, "Channel({}) to be acquired has already been acquired", channel);
                            RntbdReporter.reportIssueUnless(logger, !this.availableChannels.remove(channel), this, "Channel({}) to be acquired is still in the list of available channels", channel);
                            return channel;
                        });
                    } else {
                        if (logger.isDebugEnabled()) {
                            logger.debug("notifyChannelConnect promise.trySuccess(channel)=false");
                        }
                        this.closeChannel(channel);
                    }
                    break block14;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("notifyChannelConnect future was not successful");
                }
                promise.tryFailure(future.cause());
            }
            finally {
                if (promise instanceof ChannelPromiseWithExpiryTime) {
                    RntbdChannelAcquisitionTimeline.startNewEvent(((ChannelPromiseWithExpiryTime)promise).getChannelAcquisitionTimeline(), RntbdChannelAcquisitionEventType.ATTEMPT_TO_CREATE_NEW_CHANNEL_COMPLETE, this.clientTelemetry);
                }
                this.connecting.set(false);
            }
        }
    }

    private void notifyChannelHealthCheck(Future<Boolean> future, Channel channel, ChannelPromiseWithExpiryTime promise) {
        boolean isHealthy;
        Preconditions.checkState(channel.eventLoop().inEventLoop());
        if (future.isSuccess() && (isHealthy = ((Boolean)future.getNow()).booleanValue())) {
            try {
                channel.attr(POOL_KEY).set((Object)this);
                this.poolHandler.channelAcquired(channel);
                promise.setSuccess(channel);
            }
            catch (Throwable cause) {
                if (this.executor.inEventLoop()) {
                    this.closeChannelAndFail(channel, cause, promise);
                }
                this.executor.submit(() -> this.closeChannelAndFail(channel, cause, promise));
            }
            return;
        }
        if (this.executor.inEventLoop()) {
            this.closeChannel(channel);
            this.acquireChannel(promise);
        } else {
            this.executor.submit(() -> {
                this.closeChannel(channel);
                this.acquireChannel(promise);
            });
        }
    }

    private boolean offerChannel(Channel channel) {
        this.ensureInEventLoop();
        return this.availableChannels.offer(channel);
    }

    private RntbdChannelState getChannelState(Channel channel) {
        Preconditions.checkNotNull(channel, "Channel cannot be null");
        RntbdRequestManager manager = (RntbdRequestManager)channel.pipeline().get(RntbdRequestManager.class);
        if (manager == null) {
            return RntbdChannelState.NULL_REQUEST_MANAGER;
        }
        if (!channel.isOpen()) {
            return RntbdChannelState.CLOSED;
        }
        return manager.getChannelState(this.maxPendingAcquisitions);
    }

    private Channel pollChannel(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
        this.ensureInEventLoop();
        RntbdPollChannelEvent event = RntbdChannelAcquisitionTimeline.startNewPollEvent(channelAcquisitionTimeline, this.availableChannels.size(), this.acquiredChannels.size(), this.clientTelemetry);
        Channel first = this.availableChannels.pollFirst();
        if (first == null) {
            return null;
        }
        if (this.isClosed()) {
            return first;
        }
        RntbdChannelState channelState = this.getChannelState(first);
        RntbdChannelAcquisitionEvent.addDetail(event, channelState);
        if (channelState.isOk()) {
            return first;
        }
        this.availableChannels.offer(first);
        Channel next = this.availableChannels.pollFirst();
        while (next != first) {
            assert (next != null) : "impossible";
            if (next.isActive()) {
                RntbdChannelState state = this.getChannelState(next);
                RntbdChannelAcquisitionEvent.addDetail(event, state);
                if (state.isOk()) {
                    return next;
                }
                this.availableChannels.offer(next);
            }
            next = this.availableChannels.pollFirst();
        }
        this.availableChannels.offer(first);
        return null;
    }

    public void injectConnectionErrors(String faultInjectionRuleId, double threshold, Class<?> eventType) {
        if (this.executor.inEventLoop()) {
            this.injectConnectionErrorsInternal(faultInjectionRuleId, threshold, eventType);
        } else {
            this.executor.submit(() -> this.injectConnectionErrorsInternal(faultInjectionRuleId, threshold, eventType)).awaitUninterruptibly();
        }
    }

    private void injectConnectionErrorsInternal(String faultInjectionRuleId, double threshold, Class<?> eventType) {
        int channelsToBeClosed = (int)Math.ceil((double)this.channels(false) * threshold);
        List channelsToBeClosedList = this.acquiredChannels.values().stream().limit(channelsToBeClosed).collect(Collectors.toList());
        if (channelsToBeClosedList.size() < channelsToBeClosed) {
            channelsToBeClosedList.addAll(this.availableChannels.stream().limit(channelsToBeClosed - channelsToBeClosedList.size()).collect(Collectors.toList()));
        }
        for (Channel channel : channelsToBeClosedList) {
            if (eventType == RntbdFaultInjectionConnectionCloseEvent.class) {
                channel.pipeline().firstContext().fireUserEventTriggered((Object)new RntbdFaultInjectionConnectionCloseEvent(faultInjectionRuleId));
                continue;
            }
            if (eventType == RntbdFaultInjectionConnectionResetEvent.class) {
                channel.pipeline().firstContext().fireUserEventTriggered((Object)new RntbdFaultInjectionConnectionResetEvent(faultInjectionRuleId));
                continue;
            }
            throw new IllegalStateException("ConnectionEventType " + eventType + " is not supported");
        }
    }

    private void releaseAndOfferChannel(Channel channel, Promise<Void> promise) {
        this.ensureInEventLoop();
        try {
            if (this.acquiredChannels.remove(channel) == null) {
                logger.warn("Unexpected race condition - releaseChannel called twice for the same channel [{} -> {}]", (Object)channel.id(), (Object)this.remoteAddress());
                promise.setSuccess(null);
                return;
            }
            if (this.offerChannel(channel)) {
                this.poolHandler.channelReleased(channel);
                promise.setSuccess(null);
            } else {
                ChannelAcquisitionException error = new ChannelAcquisitionException(Strings.lenientFormat("cannot offer channel back to pool because the pool is at capacity (%s)\n  %s\n  %s", this.maxChannels, this, channel));
                this.closeChannelAndFail(channel, error, promise);
            }
        }
        catch (Throwable error) {
            this.closeChannelAndFail(channel, error, promise);
        }
    }

    private void releaseAndOfferChannelIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future) throws Exception {
        boolean isHealthy = (Boolean)future.getNow();
        if (isHealthy) {
            if (this.executor.inEventLoop()) {
                this.releaseAndOfferChannel(channel, promise);
            } else {
                this.executor.submit(() -> this.releaseAndOfferChannel(channel, promise));
            }
        } else {
            this.poolHandler.channelReleased(channel);
            if (this.executor.inEventLoop()) {
                this.closeChannel(channel);
            } else {
                this.executor.submit(() -> this.closeChannel(channel));
            }
            promise.setSuccess(null);
        }
    }

    private void releaseChannel(Channel channel, Promise<Void> promise) {
        block9: {
            boolean acquired;
            Preconditions.checkState(channel.eventLoop().inEventLoop());
            ChannelPool pool = (ChannelPool)channel.attr(POOL_KEY).getAndSet(null);
            boolean bl = acquired = this.acquiredChannels.get(channel) != null;
            if (acquired && pool == this) {
                try {
                    if (this.releaseHealthCheck) {
                        this.doChannelHealthCheckOnRelease(channel, promise);
                        break block9;
                    }
                    if (this.executor.inEventLoop()) {
                        this.releaseAndOfferChannel(channel, promise);
                        break block9;
                    }
                    this.executor.submit(() -> this.releaseAndOfferChannel(channel, promise));
                }
                catch (Throwable cause) {
                    if (this.executor.inEventLoop()) {
                        this.closeChannelAndFail(channel, cause, promise);
                        break block9;
                    }
                    this.executor.submit(() -> this.closeChannelAndFail(channel, cause, promise));
                }
            } else {
                IllegalStateException error = new IllegalStateException(Strings.lenientFormat("%s cannot be released because it was not acquired by this pool: %s", RntbdObjectMapper.toJson(channel), this));
                if (this.executor.inEventLoop()) {
                    this.closeChannelAndFail(channel, error, promise);
                } else {
                    this.executor.submit(() -> this.closeChannelAndFail(channel, error, promise));
                }
            }
        }
    }

    private void runTasksInPendingAcquisitionQueue() {
        AcquireListener task;
        this.ensureInEventLoop();
        int channelsAvailable = this.availableChannels.size();
        while ((task = this.pendingAcquisitions.poll()) != null) {
            task.acquired();
            this.acquire(task.originalPromise);
            if (--channelsAvailable > 0) continue;
        }
    }

    private void throwIfClosed() {
        Preconditions.checkState(!this.isClosed(), "%s is closed", (Object)this);
    }

    private static class AcquireListener
    implements FutureListener<Channel> {
        private final ChannelPromiseWithExpiryTime originalPromise;
        private final RntbdClientChannelPool pool;
        private boolean acquired;

        AcquireListener(RntbdClientChannelPool pool, ChannelPromiseWithExpiryTime originalPromise) {
            this.originalPromise = originalPromise;
            this.pool = pool;
        }

        public final boolean isAcquired() {
            return this.acquired;
        }

        public final AcquireListener acquired() {
            if (this.acquired) {
                return this;
            }
            this.acquired = true;
            return this;
        }

        private void doOperationComplete(Channel channel) {
            Preconditions.checkState(channel.eventLoop().inEventLoop());
            if (!channel.isActive()) {
                this.fail(CHANNEL_CLOSED_ON_ACQUIRE);
                return;
            }
            ChannelPipeline pipeline = channel.pipeline();
            Preconditions.checkState(pipeline != null, "expected non-null channel pipeline");
            RntbdRequestManager requestManager = (RntbdRequestManager)pipeline.get(RntbdRequestManager.class);
            Preconditions.checkState(requestManager != null, "expected non-null request manager");
            if (requestManager.hasRequestedRntbdContext()) {
                this.originalPromise.setSuccess(channel);
            } else {
                channel.writeAndFlush((Object)RntbdHealthCheckRequest.MESSAGE).addListener(completed -> {
                    if (completed.isSuccess()) {
                        RntbdReporter.reportIssueUnless(logger, this.acquired && requestManager.hasRntbdContext(), this, "acquired: {}, rntbdContext: {}", this.acquired, requestManager.rntbdContext());
                        this.originalPromise.setSuccess(channel);
                    } else {
                        Throwable cause = completed.cause();
                        logger.warn("Channel({}) health check request failed due to:", (Object)channel, (Object)cause);
                        this.fail(cause);
                    }
                });
            }
        }

        public final void operationComplete(Future<Channel> future) {
            if (this.pool.isClosed()) {
                if (future.isSuccess()) {
                    ((Channel)future.getNow()).close();
                }
                this.originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE);
                return;
            }
            if (future.isSuccess()) {
                Channel channel = (Channel)future.getNow();
                if (channel.eventLoop().inEventLoop()) {
                    this.doOperationComplete(channel);
                } else {
                    channel.eventLoop().execute(() -> this.doOperationComplete(channel));
                }
            } else {
                logger.warn("channel acquisition failed due to ", future.cause());
                this.fail(future.cause());
            }
        }

        public long getAcquisitionTimeoutInNanos() {
            return this.originalPromise.getExpiryTimeInNanos();
        }

        private void fail(Throwable cause) {
            this.originalPromise.setFailure(cause);
            if (this.pool.executor.inEventLoop()) {
                this.pool.runTasksInPendingAcquisitionQueue();
            } else {
                this.pool.executor.submit(() -> this.pool.runTasksInPendingAcquisitionQueue());
            }
        }
    }

    private static class ChannelAcquisitionException
    extends IllegalStateException {
        private static final long serialVersionUID = -6011782222645074949L;

        public ChannelAcquisitionException(String message) {
            super(message);
        }

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    static final class JsonSerializer
    extends StdSerializer<RntbdClientChannelPool> {
        private static final long serialVersionUID = -8688539496437151693L;

        JsonSerializer() {
            super(RntbdClientChannelPool.class);
        }

        public void serialize(RntbdClientChannelPool value, JsonGenerator generator, SerializerProvider provider) throws IOException {
            RntbdClientChannelHealthChecker healthChecker = (RntbdClientChannelHealthChecker)value.healthChecker;
            generator.writeStartObject();
            generator.writeStringField("remoteAddress", value.remoteAddress().toString());
            generator.writeBooleanField("isClosed", value.isClosed());
            generator.writeObjectFieldStart("configuration");
            generator.writeNumberField("maxChannels", value.maxChannels());
            generator.writeNumberField("maxRequestsPerChannel", value.maxRequestsPerChannel());
            generator.writeNumberField("idleConnectionTimeout", healthChecker.idleConnectionTimeoutInNanos());
            generator.writeNumberField("readDelayLimit", healthChecker.readDelayLimitInNanos());
            generator.writeNumberField("writeDelayLimit", healthChecker.writeDelayLimitInNanos());
            generator.writeEndObject();
            generator.writeObjectFieldStart("state");
            generator.writeNumberField("channelsAcquired", value.channelsAcquiredMetrics());
            generator.writeNumberField("channelsAvailable", value.channelsAvailableMetrics());
            generator.writeNumberField("requestQueueLength", value.requestQueueLength());
            generator.writeEndObject();
            generator.writeEndObject();
        }
    }

    private static abstract class AcquireTimeoutTask
    implements Runnable {
        private final RntbdClientChannelPool pool;

        public AcquireTimeoutTask(RntbdClientChannelPool pool) {
            this.pool = pool;
        }

        public abstract void onTimeout(AcquireListener var1);

        @Override
        public final void run() {
            AcquireListener removedTask;
            if (logger.isDebugEnabled()) {
                logger.debug("Starting the AcquireTimeoutTask to clean for endpoint [{}].", (Object)this.pool.remoteAddress());
            }
            long currentNanoTime = System.nanoTime();
            while ((removedTask = (AcquireListener)this.pool.pendingAcquisitions.poll()) != null) {
                long expiryTime = removedTask.getAcquisitionTimeoutInNanos();
                if (expiryTime - currentNanoTime <= 0L) {
                    this.onTimeout(removedTask);
                    continue;
                }
                if (this.pool.pendingAcquisitions.offer(removedTask)) break;
                logger.error("Unexpected failure when returning the removed task to pending acquisition queue. current size [{}]", (Object)this.pool.pendingAcquisitions.size());
                break;
            }
        }
    }
}

