/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.common.SetBlockingQueue;
import com.redis.spring.batch.reader.AbstractKeyspaceNotificationPublisher;
import com.redis.spring.batch.reader.KeyItemReader;
import com.redis.spring.batch.reader.KeyspaceNotification;
import com.redis.spring.batch.reader.KeyspaceNotificationComparator;
import com.redis.spring.batch.reader.KeyspaceNotificationPublisher;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.reader.RedisClusterKeyspaceNotificationPublisher;
import com.redis.spring.batch.reader.RedisKeyspaceNotificationPublisher;
import com.redis.spring.batch.util.CodecUtils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.codec.RedisCodec;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.util.ClassUtils;

public class KeyspaceNotificationItemReader<K>
extends AbstractItemStreamItemReader<K>
implements KeyItemReader<K>,
PollableItemReader<K> {
    public static final String MATCH_ALL = "*";
    public static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
    public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100L);
    public static final OrderingStrategy DEFAULT_ORDERING = OrderingStrategy.PRIORITY;
    public static final String QUEUE_METER = "redis.batch.notification.queue.size";
    public static final String QUEUE_MISS_COUNTER = "redis.batch.notification.queue.misses";
    private static final KeyspaceNotificationComparator NOTIFICATION_COMPARATOR = new KeyspaceNotificationComparator();
    private final Log log = LogFactory.getLog(this.getClass());
    private final AbstractRedisClient client;
    private final Function<String, K> stringKeyEncoder;
    private int database;
    private String keyPattern;
    private OrderingStrategy orderingStrategy = DEFAULT_ORDERING;
    private DataType keyType;
    private int queueCapacity = 10000;
    private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;
    private BlockingQueue<KeyspaceNotification> queue;
    private Counter queueMissCounter;
    private KeyspaceNotificationPublisher notificationPublisher;
    private String name;

    public KeyspaceNotificationItemReader(AbstractRedisClient client, RedisCodec<K, ?> codec) {
        this.setName(ClassUtils.getShortName(this.getClass()));
        this.client = client;
        this.stringKeyEncoder = CodecUtils.stringKeyFunction(codec);
    }

    public void setName(String name) {
        super.setName(name);
        this.name = name;
    }

    public BlockingQueue<KeyspaceNotification> getQueue() {
        return this.queue;
    }

    public void setDatabase(int database) {
        this.database = database;
    }

    public void setKeyPattern(String keyPattern) {
        this.keyPattern = keyPattern;
    }

    public void setPollTimeout(Duration pollTimeout) {
        this.pollTimeout = pollTimeout;
    }

    public void setQueueCapacity(int queueCapacity) {
        this.queueCapacity = queueCapacity;
    }

    public void setKeyType(DataType keyType) {
        this.keyType = keyType;
    }

    public void setOrderingStrategy(OrderingStrategy orderingStrategy) {
        this.orderingStrategy = orderingStrategy;
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.notificationPublisher == null) {
            this.log.debug((Object)String.format("Opening %s", this.name));
            this.queue = new SetBlockingQueue<KeyspaceNotification>(this.notificationQueue(), this.queueCapacity);
            Metrics.globalRegistry.gaugeCollectionSize(QUEUE_METER, Collections.emptyList(), this.queue);
            this.queueMissCounter = Metrics.globalRegistry.counter(QUEUE_MISS_COUNTER, new String[0]);
            this.notificationPublisher = this.publisher();
            this.log.debug((Object)String.format("Opened %s", this.name));
        }
    }

    private KeyspaceNotificationPublisher publisher() {
        String pattern = String.format(PUBSUB_PATTERN_FORMAT, this.database, this.keyPattern());
        AbstractKeyspaceNotificationPublisher publisher = this.publisher(pattern);
        publisher.addConsumer(this::acceptKeyspaceNotification);
        publisher.open();
        return publisher;
    }

    private void acceptKeyspaceNotification(KeyspaceNotification notification) {
        boolean added;
        if (!(this.keyType != null && notification.getEvent().getType() != this.keyType || this.queue.remainingCapacity() <= 0 || (added = this.queue.offer(notification)))) {
            this.queueMissCounter.increment();
        }
    }

    private AbstractKeyspaceNotificationPublisher publisher(String pattern) {
        if (this.client instanceof RedisClusterClient) {
            return new RedisClusterKeyspaceNotificationPublisher((RedisClusterClient)this.client, pattern);
        }
        return new RedisKeyspaceNotificationPublisher((RedisClient)this.client, pattern);
    }

    private String keyPattern() {
        if (this.keyPattern == null) {
            return MATCH_ALL;
        }
        return this.keyPattern;
    }

    private BlockingQueue<KeyspaceNotification> notificationQueue() {
        if (this.orderingStrategy == OrderingStrategy.PRIORITY) {
            return new PriorityBlockingQueue<KeyspaceNotification>(this.queueCapacity, NOTIFICATION_COMPARATOR);
        }
        return new LinkedBlockingQueue<KeyspaceNotification>(this.queueCapacity);
    }

    @Override
    public boolean isOpen() {
        return this.notificationPublisher != null;
    }

    public synchronized void close() {
        if (this.notificationPublisher != null) {
            this.log.debug((Object)String.format("Closing %s", this.name));
            this.notificationPublisher.close();
            this.notificationPublisher = null;
            this.log.debug((Object)String.format("Closed %s", this.name));
        }
        super.close();
    }

    public K read() throws InterruptedException {
        return this.poll(this.pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public K poll(long timeout, TimeUnit unit) throws InterruptedException {
        KeyspaceNotification notification = this.queue.poll(timeout, unit);
        if (notification == null) {
            return null;
        }
        return this.stringKeyEncoder.apply(notification.getKey());
    }

    public static enum OrderingStrategy {
        FIFO,
        PRIORITY;

    }
}

