/*
 * Decompiled with CFR 0.152.
 */
package com.mule.extensions.amqp.internal.connection.channel;

import com.mule.extensions.amqp.api.config.QualityOfService;
import com.mule.extensions.amqp.internal.connection.AmqpTransactionalConnection;
import com.mule.extensions.amqp.internal.connection.channel.AmqpChannelManager;
import com.mule.extensions.amqp.internal.connection.channel.ChannelPoolKey;
import com.mule.extensions.amqp.internal.connection.channel.ChannelProvider;
import com.mule.extensions.amqp.internal.connection.channel.MuleAmqpChannel;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.extension.api.tx.OperationTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChannelPool
implements ChannelProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelPool.class);
    private static final long IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis(5L);
    private static final double PURGE_THRESHOLD = 0.95;
    private static final Map<AmqpTransactionalConnection, ChannelPool> poolInstances = new ConcurrentHashMap<AmqpTransactionalConnection, ChannelPool>();
    public static final int DEFAULT_MAX_CHANNEL = 100;
    private final Map<ChannelPoolKey, BlockingQueue<MuleAmqpChannel>> channelQueues = new ConcurrentHashMap<ChannelPoolKey, BlockingQueue<MuleAmqpChannel>>();
    private final Map<MuleAmqpChannel, Object> inUseChannels = new WeakHashMap<MuleAmqpChannel, Object>();
    private final Object dummyValue = new Object();
    private final AmqpTransactionalConnection connection;
    private final int maxChannels;
    private final Scheduler scheduler;

    private ChannelPool(AmqpTransactionalConnection connection, SchedulerService schedulerService) {
        this.connection = connection;
        this.maxChannels = connection.getChannelMax() == 0 ? 100 : connection.getChannelMax();
        this.scheduler = schedulerService.customScheduler(SchedulerConfig.config().withName("ChannelPool-Cleanup").withMaxConcurrentTasks(1));
        this.startIdleChannelCleanupTask();
    }

    public static ChannelPool getInstance(AmqpTransactionalConnection connection, SchedulerService schedulerService) {
        return poolInstances.computeIfAbsent(connection, conn -> new ChannelPool((AmqpTransactionalConnection)conn, schedulerService));
    }

    @Override
    public MuleAmqpChannel getChannel(AmqpTransactionalConnection connection, AmqpChannelManager channelManager, OperationTransactionalAction transactionalAction, QualityOfService qualityOfService, boolean singleMessageChannel) throws IOException {
        ChannelPoolKey key = new ChannelPoolKey(this.connection, channelManager, transactionalAction, qualityOfService, singleMessageChannel);
        return this.acquirePooledChannel(key, channelManager, qualityOfService, singleMessageChannel);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private MuleAmqpChannel acquirePooledChannel(ChannelPoolKey key, AmqpChannelManager channelManager, QualityOfService qualityOfService, boolean singleMessageChannel) throws IOException {
        BlockingQueue queue = this.channelQueues.computeIfAbsent(key, k -> new LinkedBlockingQueue());
        ChannelPool channelPool = this;
        synchronized (channelPool) {
            this.ensureCapacityAvailable();
            MuleAmqpChannel channel = this.retrieveOrCreateChannel(queue, key, channelManager, qualityOfService, singleMessageChannel);
            this.trackChannelInUse(channel);
            return channel;
        }
    }

    private void ensureCapacityAvailable() throws IOException {
        int totalChannels = this.totalChannels();
        if (totalChannels >= this.maxChannels) {
            this.handleMaximumCapacityReached();
        } else if ((double)totalChannels >= (double)this.maxChannels * 0.95) {
            this.scheduleProactiveCleanup();
        }
    }

    private void handleMaximumCapacityReached() throws IOException {
        LOGGER.warn("Maximum number of channels reached. Purging available channels to make room.");
        this.purgeAllAvailableChannels();
        if (this.totalChannels() >= this.maxChannels) {
            throw new IOException("Maximum number of channels reached and cannot create more.");
        }
    }

    private void scheduleProactiveCleanup() {
        LOGGER.info("Approaching maximum capacity. Scheduling proactive cleanup.");
        this.scheduler.schedule(this::purgeAllAvailableChannels, 0L, TimeUnit.SECONDS);
    }

    private MuleAmqpChannel retrieveOrCreateChannel(BlockingQueue<MuleAmqpChannel> queue, ChannelPoolKey key, AmqpChannelManager channelManager, QualityOfService qualityOfService, boolean singleMessageChannel) throws IOException {
        MuleAmqpChannel channel = (MuleAmqpChannel)queue.poll();
        if (channel == null || !channel.isOpen()) {
            channel = this.createChannel(channelManager, qualityOfService, singleMessageChannel);
            this.attachShutdownListener(channel);
            LOGGER.debug("Created new muleAmpqChannel for key: {}", (Object)key);
        }
        return channel;
    }

    private MuleAmqpChannel createChannel(AmqpChannelManager channelManager, QualityOfService qualityOfService, boolean singleMessageChannel) throws IOException {
        return channelManager.createNewChannel(this.connection, qualityOfService, singleMessageChannel);
    }

    private void trackChannelInUse(MuleAmqpChannel channel) {
        this.inUseChannels.put(channel, this.dummyValue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseChannel(AmqpChannelManager channelManager, OperationTransactionalAction transactionalAction, QualityOfService qualityOfService, boolean singleMessageChannel, MuleAmqpChannel channel) {
        if (channel == null) {
            return;
        }
        Map<MuleAmqpChannel, Object> map = this.inUseChannels;
        synchronized (map) {
            this.inUseChannels.remove(channel);
        }
        if (channel.isOpen()) {
            ChannelPoolKey key = new ChannelPoolKey(this.connection, channelManager, transactionalAction, qualityOfService, singleMessageChannel);
            BlockingQueue<MuleAmqpChannel> queue = this.channelQueues.get(key);
            if (queue != null) {
                if (queue.offer(channel)) {
                    LOGGER.debug("Channel released back to pool for key: {}", (Object)key);
                    return;
                }
                LOGGER.warn("Attempted to release channel but the pool was full");
            } else {
                LOGGER.warn("Attempted to release channel for non-existent key: {}", (Object)key);
            }
            this.closeChannelQuietly(channel);
        } else {
            LOGGER.warn("Attempted to release a closed or null channel.");
        }
    }

    private void closeChannelQuietly(MuleAmqpChannel channel) {
        if (channel != null) {
            try {
                channel.close();
                LOGGER.debug("Closed channel: {}", (Object)channel);
            }
            catch (IOException | TimeoutException e) {
                LOGGER.warn("Error closing channel: {}", (Object)channel, (Object)e);
            }
        }
    }

    private void startIdleChannelCleanupTask() {
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.cleanupIdleChannels();
            }
            catch (Exception e) {
                LOGGER.error("Error during idle channel cleanup.", (Throwable)e);
            }
        }, 1L, 1L, TimeUnit.MINUTES);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanupIdleChannels() {
        long currentTime = System.currentTimeMillis();
        this.channelQueues.values().forEach(queue -> queue.removeIf(channel -> {
            boolean shouldRemove;
            boolean bl = shouldRemove = !channel.isOpen() || currentTime - channel.getLastUsedTimestamp() > IDLE_TIMEOUT;
            if (shouldRemove) {
                this.closeChannelQuietly((MuleAmqpChannel)channel);
            }
            return shouldRemove;
        }));
        Map<MuleAmqpChannel, Object> map = this.inUseChannels;
        synchronized (map) {
            this.inUseChannels.keySet().removeIf(channel -> {
                boolean shouldRemove;
                boolean bl = shouldRemove = !channel.isOpen();
                if (shouldRemove) {
                    this.closeChannelQuietly((MuleAmqpChannel)channel);
                }
                return shouldRemove;
            });
        }
        LOGGER.debug("Idle and in-use channels cleaned up.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void purgeAllAvailableChannels() {
        ChannelPool channelPool = this;
        synchronized (channelPool) {
            LOGGER.info("Purging all available channels to manage capacity.");
            this.channelQueues.values().forEach(queue -> queue.forEach(this::closeChannelQuietly));
            this.channelQueues.clear();
        }
    }

    private void attachShutdownListener(MuleAmqpChannel channel) {
        channel.addShutdownListener(cause -> {
            LOGGER.debug("Channel shutdown detected: {}", (Object)channel);
            LOGGER.debug("Exception on shutdown detected: {}", (Object)cause.getMessage());
            Map<MuleAmqpChannel, Object> map = this.inUseChannels;
            synchronized (map) {
                this.inUseChannels.remove(channel);
            }
            this.removeChannelFromPool(channel);
        });
    }

    private void removeChannelFromPool(MuleAmqpChannel channel) {
        this.channelQueues.values().forEach(queue -> queue.removeIf(queuedChannel -> queuedChannel.equals(channel)));
        this.closeChannelQuietly(channel);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int totalChannels() {
        int totalInPools = this.channelQueues.values().stream().mapToInt(Collection::size).sum();
        int totalInUse = 0;
        Map<MuleAmqpChannel, Object> map = this.inUseChannels;
        synchronized (map) {
            totalInUse = this.inUseChannels.size();
        }
        LOGGER.debug("Total channels managed by pools: {}. Total channels in use: {}", (Object)totalInPools, (Object)totalInUse);
        return totalInPools + totalInUse;
    }
}

