/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.OrPredicate;
import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.reader.KeyspaceNotificationListener;
import com.redis.spring.batch.reader.KeyspaceNotificationPublisher;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.reader.QueueOptions;
import io.micrometer.core.instrument.Tag;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

public class KeyspaceNotificationItemReader<K>
extends ItemStreamSupport
implements PollableItemReader<K>,
KeyspaceNotificationListener<K> {
    public static final String QUEUE_SIZE_GAUGE_NAME = "reader.notification.queue.size";
    private final Log log = LogFactory.getLog(this.getClass());
    private final KeyspaceNotificationPublisher<K> publisher;
    private final Converter<K, K> keyExtractor;
    protected final K[] patterns;
    private final QueueOptions queueOptions;
    private Predicate<K> filter = Objects::isNull;
    private boolean open;
    private BlockingQueue<K> queue;

    public KeyspaceNotificationItemReader(KeyspaceNotificationPublisher<K> publisher, Converter<K, K> keyExtractor, K[] patterns, QueueOptions queueOptions) {
        Assert.notNull(patterns, (String)"Patterns must not be null");
        this.setName(ClassUtils.getShortName(this.getClass()));
        this.publisher = publisher;
        this.keyExtractor = keyExtractor;
        this.patterns = patterns;
        this.queueOptions = queueOptions;
    }

    public void setFilter(Predicate<K> filter) {
        this.filter = OrPredicate.of(this.filter, filter);
    }

    @Override
    public void notification(K notification) {
        if (notification == null) {
            return;
        }
        Object key = this.keyExtractor.convert(notification);
        if (this.filter.test(key)) {
            return;
        }
        this.queue.removeIf(e -> e.equals(key));
        boolean result = this.queue.offer(key);
        if (!result) {
            this.log.warn((Object)"Could not add key because queue is full");
        }
    }

    public K read() throws Exception {
        return this.poll(this.queueOptions.getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.queue == null) {
            this.queue = new LinkedBlockingQueue<K>(this.queueOptions.getCapacity());
            Utils.createGaugeCollectionSize(QUEUE_SIZE_GAUGE_NAME, this.queue, new Tag[0]);
            this.publisher.addListener(this);
            this.publisher.subscribe(this.patterns);
            this.open = true;
        }
    }

    @Override
    public K poll(long timeout, TimeUnit unit) throws InterruptedException {
        return this.queue.poll(timeout, unit);
    }

    public synchronized void close() throws ItemStreamException {
        if (this.queue == null) {
            return;
        }
        if (!this.queue.isEmpty()) {
            this.log.warn((Object)"Closing with items still in queue");
        }
        this.publisher.unsubscribe(this.patterns);
        this.queue = null;
        this.open = false;
    }

    public boolean isOpen() {
        return this.open;
    }
}

