/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.client.pool;

import com.linecorp.armeria.client.pool.KeyedChannelPool;
import com.linecorp.armeria.client.pool.KeyedChannelPoolHandler;
import com.linecorp.armeria.client.pool.KeyedChannelPoolUtil;
import com.linecorp.armeria.client.pool.SafeKeyedChannelPoolHandler;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.util.Exceptions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;

public class DefaultKeyedChannelPool<K>
implements KeyedChannelPool<K> {
    private static final IllegalStateException FULL_EXCEPTION = Exceptions.clearTrace(new IllegalStateException("ChannelPool full"));
    private static final IllegalStateException UNHEALTHY_NON_OFFERED_TO_POOL = Exceptions.clearTrace(new IllegalStateException("Channel is unhealthy; not offering it back to pool"));
    private final EventLoop eventLoop;
    private final Function<K, Future<Channel>> channelFactory;
    private final Predicate<Channel> healthChecker;
    private final KeyedChannelPoolHandler<K> channelPoolHandler;
    private final boolean healthCheckOnRelease;
    private final Map<K, Deque<Channel>> pool;
    private final Map<K, Future<Channel>> pendingConnections;
    private final Set<Channel> allChannels;
    private boolean closed;

    public DefaultKeyedChannelPool(EventLoop eventLoop, Function<K, Future<Channel>> channelFactory, Predicate<Channel> healthChecker, KeyedChannelPoolHandler<K> channelPoolHandler, boolean healthCheckOnRelease) {
        this.eventLoop = Objects.requireNonNull(eventLoop, "eventLoop");
        this.channelFactory = Objects.requireNonNull(channelFactory, "channelFactory");
        this.healthChecker = Objects.requireNonNull(healthChecker, "healthChecker");
        this.channelPoolHandler = new SafeKeyedChannelPoolHandler<K>(Objects.requireNonNull(channelPoolHandler, "channelPoolHandler"));
        this.healthCheckOnRelease = healthCheckOnRelease;
        this.pool = new HashMap<K, Deque<Channel>>();
        this.pendingConnections = new HashMap<K, Future<Channel>>();
        this.allChannels = Collections.newSetFromMap(new IdentityHashMap());
    }

    @Override
    public Future<Channel> acquire(K key) {
        return this.acquire(key, (Promise<Channel>)this.eventLoop.newPromise());
    }

    @Override
    public Future<Channel> acquire(K key, Promise<Channel> promise) {
        Objects.requireNonNull(key, "key");
        Objects.requireNonNull(promise, "promise");
        if (this.eventLoop.inEventLoop()) {
            this.acquireHealthyFromPoolOrNew(key, promise);
        } else {
            this.eventLoop.execute(() -> this.acquireHealthyFromPoolOrNew(key, promise));
        }
        return promise;
    }

    private Future<Channel> acquireHealthyFromPoolOrNew(K key, Promise<Channel> promise) {
        assert (this.eventLoop.inEventLoop());
        if (this.closed) {
            promise.setFailure((Throwable)ClosedSessionException.get());
            return promise;
        }
        Channel ch = this.pollHealthy(key);
        if (ch == null) {
            Future<Channel> pendingChannel = this.pendingConnections.get(key);
            if (pendingChannel != null) {
                pendingChannel.addListener(unused -> this.acquireHealthyFromPoolOrNew(key, promise));
            } else {
                Future<Channel> f = this.channelFactory.apply(key);
                this.pendingConnections.put(key, f);
                if (f.isDone()) {
                    this.notifyConnect(key, f, promise);
                } else {
                    f.addListener(future -> this.notifyConnect(key, (Future<Channel>)future, promise));
                }
            }
        } else {
            try {
                ch.attr(KeyedChannelPoolUtil.POOL).set((Object)this);
                this.channelPoolHandler.channelAcquired(key, ch);
                promise.setSuccess((Object)ch);
            }
            catch (Throwable cause) {
                DefaultKeyedChannelPool.closeAndFail(ch, cause, promise);
            }
        }
        return promise;
    }

    @Nullable
    private Channel pollHealthy(K key) {
        Deque<Channel> queue = this.pool.get(key);
        if (queue == null) {
            return null;
        }
        Channel ch;
        while ((ch = queue.pollLast()) != null) {
            if (this.healthChecker.test(ch)) {
                this.removeUnhealthy(queue);
                return ch;
            }
            DefaultKeyedChannelPool.closeChannel(ch);
        }
        return null;
    }

    void removeUnhealthy(Deque<Channel> queue) {
        if (!queue.isEmpty()) {
            Channel ch;
            Iterator<Channel> i = queue.iterator();
            while (i.hasNext() && !this.healthChecker.test(ch = i.next())) {
                i.remove();
                DefaultKeyedChannelPool.closeChannel(ch);
            }
        }
    }

    private void notifyConnect(K key, Future<Channel> future, Promise<Channel> promise) {
        assert (future.isDone());
        this.pendingConnections.remove(key);
        try {
            if (future.isSuccess()) {
                Channel channel = (Channel)future.getNow();
                if (this.closed) {
                    channel.close();
                    promise.setFailure((Throwable)ClosedSessionException.get());
                    return;
                }
                channel.attr(KeyedChannelPoolUtil.POOL).set((Object)this);
                this.channelPoolHandler.channelCreated(key, channel);
                this.allChannels.add(channel);
                channel.closeFuture().addListener(f -> {
                    this.channelPoolHandler.channelClosed(key, channel);
                    this.allChannels.remove(channel);
                    Deque<Channel> queue = this.pool.get(key);
                    if (queue != null) {
                        this.removeUnhealthy(queue);
                        if (queue.isEmpty()) {
                            this.pool.remove(key);
                        }
                    }
                });
                promise.setSuccess((Object)channel);
            } else {
                promise.setFailure(future.cause());
            }
        }
        catch (Exception e) {
            promise.setFailure((Throwable)e);
        }
    }

    private static void closeChannel(Channel channel) {
        channel.attr(KeyedChannelPoolUtil.POOL).set(null);
        if (channel.isOpen()) {
            channel.close();
        }
    }

    private static void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
        DefaultKeyedChannelPool.closeChannel(channel);
        promise.setFailure(cause);
    }

    @Override
    public Future<Void> release(K key, Channel channel) {
        return this.release(key, channel, (Promise<Void>)this.eventLoop.newPromise());
    }

    @Override
    public Future<Void> release(K key, Channel channel, Promise<Void> promise) {
        Objects.requireNonNull(key, "key");
        Objects.requireNonNull(channel, "channel");
        Objects.requireNonNull(promise, "promise");
        try {
            EventLoop loop = channel.eventLoop();
            if (loop.inEventLoop()) {
                this.doRelease(key, channel, promise);
            } else {
                loop.execute(() -> this.doRelease(key, channel, promise));
            }
        }
        catch (Throwable cause) {
            DefaultKeyedChannelPool.closeAndFail(channel, cause, promise);
        }
        return promise;
    }

    private void doRelease(K key, Channel channel, Promise<Void> promise) {
        assert (channel.eventLoop().inEventLoop());
        if (channel.attr(KeyedChannelPoolUtil.POOL).getAndSet(null) != this) {
            DefaultKeyedChannelPool.closeAndFail(channel, new IllegalArgumentException("Channel " + channel + " was not acquired from this ChannelPool"), promise);
        } else {
            try {
                if (this.healthCheckOnRelease) {
                    this.healthCheckOnRelease(key, channel, promise);
                } else {
                    this.releaseAndOffer(key, channel, promise);
                }
            }
            catch (Throwable cause) {
                DefaultKeyedChannelPool.closeAndFail(channel, cause, promise);
            }
        }
    }

    private void healthCheckOnRelease(K key, Channel channel, Promise<Void> promise) throws Exception {
        if (this.healthChecker.test(channel)) {
            this.releaseAndOffer(key, channel, promise);
        } else {
            this.channelPoolHandler.channelReleased(key, channel);
            DefaultKeyedChannelPool.closeAndFail(channel, UNHEALTHY_NON_OFFERED_TO_POOL, promise);
        }
    }

    private void releaseAndOffer(K key, Channel channel, Promise<Void> promise) throws Exception {
        if (this.offerChannel(key, channel)) {
            this.channelPoolHandler.channelReleased(key, channel);
            promise.setSuccess(null);
        } else {
            DefaultKeyedChannelPool.closeAndFail(channel, FULL_EXCEPTION, promise);
        }
    }

    protected boolean offerChannel(K key, Channel channel) {
        return this.pool.computeIfAbsent(key, k -> new ConcurrentLinkedDeque()).offerLast(channel);
    }

    @Override
    public void close() {
        this.closed = true;
        if (this.eventLoop.inEventLoop()) {
            this.doCloseAsync();
        } else {
            this.doCloseSync();
        }
    }

    private void doCloseAsync() {
        if (this.allChannels.isEmpty()) {
            return;
        }
        ArrayList<ChannelFuture> closeFutures = new ArrayList<ChannelFuture>(this.allChannels.size());
        for (Channel ch : this.allChannels) {
            closeFutures.add(ch.closeFuture());
        }
        closeFutures.forEach(f -> f.channel().close());
    }

    private void doCloseSync() {
        CountDownLatch outerLatch = (CountDownLatch)this.eventLoop.submit(() -> {
            if (this.allChannels.isEmpty()) {
                return null;
            }
            int numChannels = this.allChannels.size();
            CountDownLatch latch = new CountDownLatch(numChannels);
            if (numChannels == 0) {
                return latch;
            }
            ArrayList<ChannelFuture> closeFutures = new ArrayList<ChannelFuture>(numChannels);
            for (Channel ch : this.allChannels) {
                ChannelFuture f2 = ch.closeFuture();
                closeFutures.add(f2);
                f2.addListener((GenericFutureListener)((ChannelFutureListener)future -> latch.countDown()));
            }
            closeFutures.forEach(f -> f.channel().close());
            return latch;
        }).syncUninterruptibly().getNow();
        if (outerLatch != null) {
            boolean interrupted = false;
            while (outerLatch.getCount() != 0L) {
                try {
                    outerLatch.await();
                }
                catch (InterruptedException e) {
                    interrupted = true;
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

