/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.reader;

import com.redis.spring.batch.reader.KeyspaceNotificationListener;
import com.redis.spring.batch.reader.KeyspaceNotificationPublisher;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;

public class RedisClusterKeyspaceNotificationPublisher<K, V>
extends RedisClusterPubSubAdapter<K, V>
implements KeyspaceNotificationPublisher<K> {
    private final Log log = LogFactory.getLog(this.getClass());
    private final StatefulRedisClusterPubSubConnection<K, V> connection;
    private final List<KeyspaceNotificationListener<K>> listeners = new ArrayList<KeyspaceNotificationListener<K>>();

    public RedisClusterKeyspaceNotificationPublisher(StatefulRedisClusterPubSubConnection<K, V> connection) {
        Assert.notNull(connection, (String)"A pub/sub connection is required");
        this.connection = connection;
    }

    @Override
    public void addListener(KeyspaceNotificationListener<K> listener) {
        this.listeners.add(listener);
    }

    @Override
    public void subscribe(K ... patterns) {
        this.log.debug((Object)"Adding pub/sub listener");
        this.connection.addListener((RedisClusterPubSubListener)this);
        this.connection.setNodeMessagePropagation(true);
        this.log.debug((Object)"Subscribing to keyspace notifications");
        ((NodeSelectionPubSubCommands)this.connection.sync().upstream().commands()).psubscribe((Object[])patterns);
    }

    @Override
    public void unsubscribe(K ... patterns) {
        if (this.connection == null) {
            return;
        }
        this.log.debug((Object)"Unsubscribing from keyspace notifications");
        ((NodeSelectionPubSubCommands)this.connection.sync().upstream().commands()).punsubscribe((Object[])patterns);
        this.log.debug((Object)"Removing pub/sub listener");
        this.connection.removeListener((RedisClusterPubSubListener)this);
    }

    private void notification(K notification) {
        this.listeners.forEach(l -> l.notification(notification));
    }

    public void message(RedisClusterNode node, K channel, V message) {
        this.notification(channel);
    }

    public void message(RedisClusterNode node, K pattern, K channel, V message) {
        this.notification(channel);
    }
}

