/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.pubsub;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.SubscribeListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.CountDownLatchPubSub;
import org.redisson.pubsub.LockPubSub;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.SemaphorePubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishSubscribeService {
    private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
    private final ConnectionManager connectionManager;
    private final MasterSlaveServersConfig config;
    private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
    private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<ChannelName, PubSubConnectionEntry>();
    private final ConcurrentMap<MasterSlaveEntry, Queue<PubSubConnectionEntry>> freePubSubMap = new ConcurrentHashMap<MasterSlaveEntry, Queue<PubSubConnectionEntry>>();
    private final Queue<PubSubConnectionEntry> emptyQueue = new LinkedList<PubSubConnectionEntry>();
    private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
    private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);
    private final LockPubSub lockPubSub = new LockPubSub(this);

    public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
        this.connectionManager = connectionManager;
        this.config = config;
        for (int i = 0; i < this.locks.length; ++i) {
            this.locks[i] = new AsyncSemaphore(1);
        }
    }

    public LockPubSub getLockPubSub() {
        return this.lockPubSub;
    }

    public CountDownLatchPubSub getCountDownLatchPubSub() {
        return this.countDownLatchPubSub;
    }

    public SemaphorePubSub getSemaphorePubSub() {
        return this.semaphorePubSub;
    }

    public PubSubConnectionEntry getPubSubEntry(ChannelName channelName) {
        return (PubSubConnectionEntry)this.name2PubSubConnection.get(channelName);
    }

    public RFuture<PubSubConnectionEntry> psubscribe(ChannelName channelName, Codec codec, RedisPubSubListener<?> ... listeners) {
        return this.subscribe(PubSubType.PSUBSCRIBE, codec, channelName, listeners);
    }

    public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?> ... listeners) {
        RedissonPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
        this.subscribe(codec, new ChannelName(channelName), promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
        return promise;
    }

    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?> ... listeners) {
        return this.subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners);
    }

    private RFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName, RedisPubSubListener<?> ... listeners) {
        RedissonPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
        AsyncSemaphore lock = this.getSemaphore(channelName);
        lock.acquire(() -> {
            if (promise.isDone()) {
                lock.release();
                return;
            }
            this.subscribe(codec, channelName, promise, type, lock, listeners);
        });
        return promise;
    }

    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?> ... listeners) {
        RedissonPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
        this.subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
        return promise;
    }

    public AsyncSemaphore getSemaphore(ChannelName channelName) {
        return this.locks[Math.abs(channelName.hashCode() % this.locks.length)];
    }

    private void subscribe(final Codec codec, final ChannelName channelName, final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?> ... listeners) {
        PubSubConnectionEntry connEntry = (PubSubConnectionEntry)this.name2PubSubConnection.get(channelName);
        if (connEntry != null) {
            this.addListeners(channelName, promise, type, lock, connEntry, listeners);
            return;
        }
        this.freePubSubLock.acquire(new Runnable(){

            @Override
            public void run() {
                if (promise.isDone()) {
                    lock.release();
                    PublishSubscribeService.this.freePubSubLock.release();
                    return;
                }
                Queue freePubSubConnections = PublishSubscribeService.this.getConnectionsQueue(channelName);
                PubSubConnectionEntry freeEntry = (PubSubConnectionEntry)freePubSubConnections.peek();
                if (freeEntry == null) {
                    PublishSubscribeService.this.connect(codec, channelName, promise, type, lock, listeners);
                    return;
                }
                int remainFreeAmount = freeEntry.tryAcquire();
                if (remainFreeAmount == -1) {
                    throw new IllegalStateException();
                }
                PubSubConnectionEntry oldEntry = PublishSubscribeService.this.name2PubSubConnection.putIfAbsent(channelName, freeEntry);
                if (oldEntry != null) {
                    freeEntry.release();
                    PublishSubscribeService.this.freePubSubLock.release();
                    PublishSubscribeService.this.addListeners(channelName, promise, type, lock, oldEntry, listeners);
                    return;
                }
                if (remainFreeAmount == 0) {
                    freePubSubConnections.poll();
                }
                PublishSubscribeService.this.freePubSubLock.release();
                final RFuture subscribeFuture = PublishSubscribeService.this.addListeners(channelName, promise, type, lock, freeEntry, listeners);
                ChannelFuture future = PubSubType.PSUBSCRIBE == type ? freeEntry.psubscribe(codec, channelName) : freeEntry.subscribe(codec, channelName);
                future.addListener(new ChannelFutureListener(){

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            if (!promise.isDone()) {
                                subscribeFuture.cancel(false);
                            }
                            return;
                        }
                        PublishSubscribeService.this.connectionManager.newTimeout(new TimerTask(){

                            @Override
                            public void run(Timeout timeout) throws Exception {
                                subscribeFuture.cancel(false);
                            }
                        }, PublishSubscribeService.this.config.getTimeout(), TimeUnit.MILLISECONDS);
                    }
                });
            }
        });
    }

    private Queue<PubSubConnectionEntry> getConnectionsQueue(ChannelName channelName) {
        int slot = this.connectionManager.calcSlot(channelName.getName());
        MasterSlaveEntry entry = this.connectionManager.getEntry(slot);
        return this.freePubSubMap.getOrDefault(entry, this.emptyQueue);
    }

    private RFuture<Void> addListeners(ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry, RedisPubSubListener<?> ... listeners) {
        for (RedisPubSubListener<?> listener : listeners) {
            connEntry.addListener(channelName, listener);
        }
        SubscribeListener list = connEntry.getSubscribeFuture(channelName, type);
        RFuture<Void> subscribeFuture = list.getSuccessFuture();
        subscribeFuture.onComplete((res, e) -> {
            if (!promise.trySuccess(connEntry)) {
                for (RedisPubSubListener listener : listeners) {
                    connEntry.removeListener(channelName, listener);
                }
                if (!connEntry.hasListeners(channelName)) {
                    this.unsubscribe(channelName, lock);
                } else {
                    lock.release();
                }
            } else {
                lock.release();
            }
        });
        return subscribeFuture;
    }

    private void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) {
        MasterSlaveEntry entry = this.connectionManager.getEntry(slot);
        if (entry == null) {
            log.error("Node for slot: " + slot + " can't be found");
        } else {
            entry.returnPubSubConnection(pubSubEntry);
        }
    }

    private RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {
        MasterSlaveEntry entry = this.connectionManager.getEntry(slot);
        if (entry == null) {
            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
            return RedissonPromise.newFailedFuture(ex);
        }
        return entry.nextPubSubConnection();
    }

    private void connect(Codec codec, ChannelName channelName, final RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?> ... listeners) {
        int slot = this.connectionManager.calcSlot(channelName.getName());
        RFuture<RedisPubSubConnection> connFuture = this.nextPubSubConnection(slot);
        promise.onComplete((res, e) -> {
            if (e != null) {
                ((RPromise)connFuture).tryFailure((Throwable)e);
            }
        });
        connFuture.onComplete((conn, ex) -> {
            if (ex != null) {
                this.freePubSubLock.release();
                lock.release();
                promise.tryFailure((Throwable)ex);
                return;
            }
            PubSubConnectionEntry entry = new PubSubConnectionEntry((RedisPubSubConnection)conn, this.config.getSubscriptionsPerConnection());
            int remainFreeAmount = entry.tryAcquire();
            PubSubConnectionEntry oldEntry = this.name2PubSubConnection.putIfAbsent(channelName, entry);
            if (oldEntry != null) {
                this.releaseSubscribeConnection(slot, entry);
                this.freePubSubLock.release();
                this.addListeners(channelName, promise, type, lock, oldEntry, listeners);
                return;
            }
            if (remainFreeAmount > 0) {
                this.addFreeConnectionEntry(channelName, entry);
            }
            this.freePubSubLock.release();
            final RFuture<Void> subscribeFuture = this.addListeners(channelName, promise, type, lock, entry, listeners);
            ChannelFuture future = PubSubType.PSUBSCRIBE == type ? entry.psubscribe(codec, channelName) : entry.subscribe(codec, channelName);
            future.addListener(new ChannelFutureListener(){

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        if (!promise.isDone()) {
                            subscribeFuture.cancel(false);
                        }
                        return;
                    }
                    PublishSubscribeService.this.connectionManager.newTimeout(new TimerTask(){

                        @Override
                        public void run(Timeout timeout) throws Exception {
                            subscribeFuture.cancel(false);
                        }
                    }, PublishSubscribeService.this.config.getTimeout(), TimeUnit.MILLISECONDS);
                }
            });
        });
    }

    public RFuture<Void> unsubscribe(final ChannelName channelName, final AsyncSemaphore lock) {
        final PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(channelName);
        if (entry == null || this.connectionManager.isShuttingDown()) {
            lock.release();
            return RedissonPromise.newSucceededFuture(null);
        }
        final AtomicBoolean executed = new AtomicBoolean();
        final RedissonPromise<Void> result = new RedissonPromise<Void>();
        ChannelFuture future = entry.unsubscribe(channelName, new BaseRedisPubSubListener(){

            @Override
            public boolean onStatus(PubSubType type, CharSequence channel) {
                if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
                    executed.set(true);
                    if (entry.release() == 1) {
                        PublishSubscribeService.this.addFreeConnectionEntry(channelName, entry);
                    }
                    lock.release();
                    result.trySuccess(null);
                    return true;
                }
                return false;
            }
        });
        future.addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                PublishSubscribeService.this.connectionManager.newTimeout(new TimerTask(){

                    @Override
                    public void run(Timeout timeout) throws Exception {
                        if (executed.get()) {
                            return;
                        }
                        entry.getConnection().onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channelName));
                    }
                }, PublishSubscribeService.this.config.getTimeout(), TimeUnit.MILLISECONDS);
            }
        });
        return result;
    }

    public void remove(MasterSlaveEntry entry) {
        this.freePubSubMap.remove(entry);
    }

    public RFuture<Codec> unsubscribe(final ChannelName channelName, final PubSubType topicType) {
        if (this.connectionManager.isShuttingDown()) {
            return RedissonPromise.newSucceededFuture(null);
        }
        final RedissonPromise<Codec> result = new RedissonPromise<Codec>();
        final AsyncSemaphore lock = this.getSemaphore(channelName);
        lock.acquire(new Runnable(){

            @Override
            public void run() {
                final PubSubConnectionEntry entry = (PubSubConnectionEntry)PublishSubscribeService.this.name2PubSubConnection.remove(channelName);
                if (entry == null) {
                    lock.release();
                    result.trySuccess(null);
                    return;
                }
                PublishSubscribeService.this.freePubSubLock.acquire(new Runnable(){

                    @Override
                    public void run() {
                        Queue freePubSubConnections = PublishSubscribeService.this.getConnectionsQueue(channelName);
                        freePubSubConnections.remove(entry);
                        PublishSubscribeService.this.freePubSubLock.release();
                        final Codec entryCodec = topicType == PubSubType.PUNSUBSCRIBE ? entry.getConnection().getPatternChannels().get(channelName) : entry.getConnection().getChannels().get(channelName);
                        final AtomicBoolean executed = new AtomicBoolean();
                        BaseRedisPubSubListener listener = new BaseRedisPubSubListener(){

                            @Override
                            public boolean onStatus(PubSubType type, CharSequence channel) {
                                if (type == topicType && channel.equals(channelName)) {
                                    executed.set(true);
                                    lock.release();
                                    result.trySuccess(entryCodec);
                                    return true;
                                }
                                return false;
                            }
                        };
                        ChannelFuture future = topicType == PubSubType.PUNSUBSCRIBE ? entry.punsubscribe(channelName, listener) : entry.unsubscribe(channelName, listener);
                        future.addListener(new ChannelFutureListener(){

                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (!future.isSuccess()) {
                                    return;
                                }
                                PublishSubscribeService.this.connectionManager.newTimeout(new TimerTask(){

                                    @Override
                                    public void run(Timeout timeout) throws Exception {
                                        if (executed.get()) {
                                            return;
                                        }
                                        entry.getConnection().onMessage(new PubSubStatusMessage(topicType, channelName));
                                    }
                                }, PublishSubscribeService.this.config.getTimeout(), TimeUnit.MILLISECONDS);
                            }
                        });
                    }
                });
            }
        });
        return result;
    }

    public void punsubscribe(final ChannelName channelName, final AsyncSemaphore lock) {
        final PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(channelName);
        if (entry == null || this.connectionManager.isShuttingDown()) {
            lock.release();
            return;
        }
        entry.punsubscribe(channelName, new BaseRedisPubSubListener(){

            @Override
            public boolean onStatus(PubSubType type, CharSequence channel) {
                if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
                    if (entry.release() == 1) {
                        PublishSubscribeService.this.addFreeConnectionEntry(channelName, entry);
                    }
                    lock.release();
                    return true;
                }
                return false;
            }
        });
    }

    private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) {
        int slot = this.connectionManager.calcSlot(channelName.getName());
        MasterSlaveEntry me = this.connectionManager.getEntry(slot);
        Queue freePubSubConnections = this.freePubSubMap.computeIfAbsent(me, e -> new ConcurrentLinkedQueue());
        freePubSubConnections.add(entry);
    }

    public void reattachPubSub(int slot) {
        this.name2PubSubConnection.entrySet().stream().filter(e -> this.connectionManager.calcSlot(((ChannelName)e.getKey()).getName()) == slot).forEach(entry -> {
            Codec patternCodec;
            PubSubConnectionEntry pubSubEntry = (PubSubConnectionEntry)entry.getValue();
            Codec codec = pubSubEntry.getConnection().getChannels().get(entry.getKey());
            if (codec != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners((ChannelName)entry.getKey());
                this.unsubscribe((ChannelName)entry.getKey(), PubSubType.UNSUBSCRIBE);
                this.subscribe(codec, (ChannelName)entry.getKey(), listeners.toArray(new RedisPubSubListener[0]));
            }
            if ((patternCodec = pubSubEntry.getConnection().getPatternChannels().get(entry.getKey())) != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners((ChannelName)entry.getKey());
                this.unsubscribe((ChannelName)entry.getKey(), PubSubType.PUNSUBSCRIBE);
                this.psubscribe((ChannelName)entry.getKey(), patternCodec, listeners.toArray(new RedisPubSubListener[0]));
            }
        });
    }

    public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
        Queue<RedisPubSubListener<?>> listeners;
        PubSubConnectionEntry pubSubEntry;
        block0: for (final Queue queue : this.freePubSubMap.values()) {
            for (final PubSubConnectionEntry entry : queue) {
                if (!entry.getConnection().equals(redisPubSubConnection)) continue;
                this.freePubSubLock.acquire(new Runnable(){

                    @Override
                    public void run() {
                        queue.remove(entry);
                        PublishSubscribeService.this.freePubSubLock.release();
                    }
                });
                continue block0;
            }
        }
        for (ChannelName channelName : redisPubSubConnection.getChannels().keySet()) {
            pubSubEntry = this.getPubSubEntry(channelName);
            listeners = pubSubEntry.getListeners(channelName);
            this.reattachPubSubListeners(channelName, listeners, PubSubType.UNSUBSCRIBE);
        }
        for (ChannelName channelName : redisPubSubConnection.getPatternChannels().keySet()) {
            pubSubEntry = this.getPubSubEntry(channelName);
            listeners = pubSubEntry.getListeners(channelName);
            this.reattachPubSubListeners(channelName, listeners, PubSubType.PUNSUBSCRIBE);
        }
    }

    private void reattachPubSubListeners(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, PubSubType topicType) {
        RFuture<Codec> subscribeCodecFuture = this.unsubscribe(channelName, topicType);
        if (listeners.isEmpty()) {
            return;
        }
        subscribeCodecFuture.onComplete((subscribeCodec, e) -> {
            if (subscribeCodec == null) {
                return;
            }
            if (topicType == PubSubType.PUNSUBSCRIBE) {
                this.psubscribe(channelName, listeners, (Codec)subscribeCodec);
            } else {
                this.subscribe(channelName, listeners, (Codec)subscribeCodec);
            }
        });
    }

    private void subscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        RFuture<PubSubConnectionEntry> subscribeFuture = this.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
        subscribeFuture.onComplete((res, e) -> {
            if (e != null) {
                this.subscribe(channelName, listeners, subscribeCodec);
                return;
            }
            log.info("listeners of '{}' channel to '{}' have been resubscribed", (Object)channelName, (Object)res.getConnection().getRedisClient());
        });
    }

    private void psubscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        RFuture<PubSubConnectionEntry> subscribeFuture = this.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
        subscribeFuture.onComplete((res, e) -> {
            if (e != null) {
                this.psubscribe(channelName, listeners, subscribeCodec);
                return;
            }
            log.info("listeners of '{}' channel-pattern to '{}' have been resubscribed", (Object)channelName, (Object)res.getConnection().getRedisClient());
        });
    }

    public String toString() {
        return "PublishSubscribeService [name2PubSubConnection=" + this.name2PubSubConnection + ", freePubSubMap=" + this.freePubSubMap + "]";
    }
}

