/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch;

import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.JobRunner;
import com.redis.spring.batch.common.KeyDump;
import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.reader.DataStructureValueReader;
import com.redis.spring.batch.reader.KeyComparison;
import com.redis.spring.batch.reader.KeyComparisonValueReader;
import com.redis.spring.batch.reader.KeyDumpValueReader;
import com.redis.spring.batch.reader.LiveReaderBuilder;
import com.redis.spring.batch.reader.ReaderOptions;
import com.redis.spring.batch.reader.ScanReaderBuilder;
import com.redis.spring.batch.reader.StreamReaderBuilder;
import io.lettuce.core.Consumer;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ClassUtils;

public class RedisItemReader<K, T>
extends AbstractItemStreamItemReader<T> {
    private final Log log = LogFactory.getLog(((Object)((Object)this)).getClass());
    protected final ItemReader<K> keyReader;
    private final ItemProcessor<List<? extends K>, List<T>> valueReader;
    protected final BlockingQueue<T> queue;
    private final Enqueuer enqueuer = new Enqueuer();
    private final JobRunner jobRunner;
    protected final ReaderOptions options;
    private JobExecution jobExecution;
    private String name;
    private final AtomicInteger runningThreads = new AtomicInteger();

    public RedisItemReader(ItemReader<K> keyReader, ItemProcessor<List<? extends K>, List<T>> valueReader, JobRunner jobRunner, ReaderOptions options) {
        this.setName(ClassUtils.getShortName(((Object)((Object)this)).getClass()));
        this.keyReader = keyReader;
        this.valueReader = valueReader;
        this.queue = new LinkedBlockingQueue<T>(options.getQueueOptions().getCapacity());
        this.jobRunner = jobRunner;
        this.options = options;
    }

    public ItemReader<K> getKeyReader() {
        return this.keyReader;
    }

    public void setName(String name) {
        this.name = name;
        super.setName(name);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        AtomicInteger atomicInteger = this.runningThreads;
        synchronized (atomicInteger) {
            if (this.jobExecution == null) {
                this.doOpen();
            }
            this.runningThreads.incrementAndGet();
            super.open(executionContext);
        }
    }

    protected void doOpen() {
        SimpleStepBuilder<K, K> step = this.createStep();
        FaultTolerantStepBuilder stepBuilder = step.faultTolerant();
        this.options.getSkip().forEach(arg_0 -> ((FaultTolerantStepBuilder)stepBuilder).skip(arg_0));
        this.options.getNoSkip().forEach(arg_0 -> ((FaultTolerantStepBuilder)stepBuilder).noSkip(arg_0));
        stepBuilder.skipLimit(this.options.getSkipLimit());
        this.options.getSkipPolicy().ifPresent(arg_0 -> ((FaultTolerantStepBuilder)stepBuilder).skipPolicy(arg_0));
        if (this.options.getThreads() > 1) {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setMaxPoolSize(this.options.getThreads());
            taskExecutor.setCorePoolSize(this.options.getThreads());
            taskExecutor.afterPropertiesSet();
            stepBuilder.taskExecutor((TaskExecutor)taskExecutor).throttleLimit(this.options.getThreads());
        }
        SimpleJobBuilder simpleJobBuilder = this.jobRunner.job(this.name).start((Step)stepBuilder.build());
        Job job = simpleJobBuilder.build();
        try {
            this.jobExecution = this.jobRunner.runAsync(job);
        }
        catch (JobExecutionException e) {
            throw new ItemStreamException(String.format("Could not run job %s", this.name), (Throwable)e);
        }
    }

    private T poll() throws InterruptedException {
        return this.queue.poll(this.options.getQueueOptions().getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
    }

    protected SimpleStepBuilder<K, K> createStep() {
        if (this.keyReader instanceof ItemStreamSupport) {
            ((ItemStreamSupport)this.keyReader).setName(this.name + "-reader");
        }
        this.enqueuer.setName(this.name + "-writer");
        return this.jobRunner.step(this.name).chunk(this.options.getChunkSize()).reader(this.keyReader).writer((ItemWriter)this.enqueuer);
    }

    public T read() throws Exception {
        T item;
        while ((item = this.poll()) == null && this.jobExecution.isRunning() && !this.jobExecution.getStatus().isUnsuccessful()) {
        }
        return item;
    }

    public List<T> read(int maxElements) {
        ArrayList items = new ArrayList(maxElements);
        this.queue.drainTo(items, maxElements);
        return items;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        super.close();
        if (this.runningThreads.decrementAndGet() > 0) {
            return;
        }
        AtomicInteger atomicInteger = this.runningThreads;
        synchronized (atomicInteger) {
            this.jobRunner.awaitTermination(this.jobExecution);
            this.enqueuer.close();
            this.jobExecution = null;
        }
    }

    public boolean isOpen() {
        return this.jobExecution != null;
    }

    public static ScanReaderBuilder<String, String, KeyComparison<String>> comparator(JobRunner jobRunner, GenericObjectPool<StatefulConnection<String, String>> left, GenericObjectPool<StatefulConnection<String, String>> right, Duration ttlTolerance) {
        return new ScanReaderBuilder<String, String, KeyComparison<String>>(left, jobRunner, new KeyComparisonValueReader(left, right, ttlTolerance));
    }

    public static <K, V> ScanReaderBuilder<K, V, DataStructure<K>> dataStructure(GenericObjectPool<StatefulConnection<K, V>> connectionPool, JobRunner jobRunner) {
        return new ScanReaderBuilder(connectionPool, jobRunner, new DataStructureValueReader<K, V>(connectionPool));
    }

    public static <K, V> ScanReaderBuilder<K, V, KeyDump<K>> keyDump(GenericObjectPool<StatefulConnection<K, V>> connectionPool, JobRunner jobRunner) {
        return new ScanReaderBuilder(connectionPool, jobRunner, new KeyDumpValueReader<K, V>(connectionPool));
    }

    public static <K, V> StreamReaderBuilder<K, V> stream(GenericObjectPool<StatefulConnection<K, V>> connectionPool, K name, Consumer<K> consumer) {
        return new StreamReaderBuilder<K, V>(connectionPool, name, consumer);
    }

    public static LiveReaderBuilder<String, String, DataStructure<String>> liveDataStructure(GenericObjectPool<StatefulConnection<String, String>> pool, JobRunner jobRunner, StatefulRedisPubSubConnection<String, String> pubSubConnection, String ... keyPatterns) {
        return RedisItemReader.liveDataStructure(pool, jobRunner, pubSubConnection, 0, keyPatterns);
    }

    public static LiveReaderBuilder<String, String, DataStructure<String>> liveDataStructure(GenericObjectPool<StatefulConnection<String, String>> pool, JobRunner jobRunner, StatefulRedisPubSubConnection<String, String> pubSubConnection, int database, String ... keyPatterns) {
        return RedisItemReader.liveDataStructure(pool, jobRunner, pubSubConnection, LiveReaderBuilder.pubSubPatterns(database, keyPatterns), LiveReaderBuilder.STRING_KEY_EXTRACTOR);
    }

    public static <K, V> LiveReaderBuilder<K, V, DataStructure<K>> liveDataStructure(GenericObjectPool<StatefulConnection<K, V>> pool, JobRunner jobRunner, StatefulRedisPubSubConnection<K, V> pubSubConnection, K[] pubSubPatterns, Converter<K, K> eventKeyExtractor) {
        return new LiveReaderBuilder(jobRunner, new DataStructureValueReader<K, V>(pool), pubSubConnection, pubSubPatterns, eventKeyExtractor);
    }

    public static <K, V> LiveReaderBuilder<K, V, DataStructure<K>> liveDataStructure(GenericObjectPool<StatefulConnection<K, V>> pool, JobRunner jobRunner, StatefulRedisPubSubConnection<K, V> pubSubConnection, RedisCodec<K, V> codec, int database, String ... keyPatterns) {
        return new LiveReaderBuilder(jobRunner, new DataStructureValueReader<K, V>(pool), pubSubConnection, LiveReaderBuilder.pubSubPatterns(codec, database, keyPatterns), LiveReaderBuilder.keyExtractor(codec));
    }

    public static <K, V> LiveReaderBuilder<K, V, KeyDump<K>> liveKeyDump(GenericObjectPool<StatefulConnection<K, V>> pool, JobRunner jobRunner, StatefulRedisPubSubConnection<K, V> pubSubConnection, RedisCodec<K, V> codec, int database, String ... keyPatterns) {
        return new LiveReaderBuilder(jobRunner, new KeyDumpValueReader<K, V>(pool), pubSubConnection, LiveReaderBuilder.pubSubPatterns(codec, database, keyPatterns), LiveReaderBuilder.keyExtractor(codec));
    }

    public static LiveReaderBuilder<String, String, KeyDump<String>> liveKeyDump(GenericObjectPool<StatefulConnection<String, String>> pool, JobRunner jobRunner, StatefulRedisPubSubConnection<String, String> pubSubConnection, String ... keyPatterns) {
        return RedisItemReader.liveKeyDump(pool, jobRunner, pubSubConnection, 0, keyPatterns);
    }

    public static LiveReaderBuilder<String, String, KeyDump<String>> liveKeyDump(GenericObjectPool<StatefulConnection<String, String>> pool, JobRunner jobRunner, StatefulRedisPubSubConnection<String, String> pubSubConnection, int database, String ... keyPatterns) {
        return RedisItemReader.liveKeyDump(pool, jobRunner, pubSubConnection, LiveReaderBuilder.pubSubPatterns(database, keyPatterns), LiveReaderBuilder.STRING_KEY_EXTRACTOR);
    }

    public static <K, V> LiveReaderBuilder<K, V, KeyDump<K>> liveKeyDump(GenericObjectPool<StatefulConnection<K, V>> pool, JobRunner jobRunner, StatefulRedisPubSubConnection<K, V> pubSubConnection, K[] pubSubPatterns, Converter<K, K> eventKeyExtractor) {
        return new LiveReaderBuilder(jobRunner, new KeyDumpValueReader<K, V>(pool), pubSubConnection, pubSubPatterns, eventKeyExtractor);
    }

    private class Enqueuer
    extends AbstractItemStreamItemWriter<K> {
        private Enqueuer() {
        }

        public void open(ExecutionContext executionContext) {
            Utils.createGaugeCollectionSize("reader.queue.size", RedisItemReader.this.queue, new Tag[0]);
            super.open(executionContext);
            if (RedisItemReader.this.valueReader instanceof ItemStream) {
                ((ItemStream)RedisItemReader.this.valueReader).open(executionContext);
            }
        }

        public void update(ExecutionContext executionContext) {
            super.update(executionContext);
            if (RedisItemReader.this.valueReader instanceof ItemStream) {
                ((ItemStream)RedisItemReader.this.valueReader).update(executionContext);
            }
        }

        public void close() {
            if (!RedisItemReader.this.queue.isEmpty()) {
                RedisItemReader.this.log.warn((Object)"Closing with items still in queue");
            }
            if (RedisItemReader.this.valueReader instanceof ItemStream) {
                ((ItemStream)RedisItemReader.this.valueReader).close();
            }
            super.close();
        }

        public void write(List<? extends K> items) throws Exception {
            List values = (List)RedisItemReader.this.valueReader.process(items);
            for (Object value : values) {
                RedisItemReader.this.queue.put(value);
            }
        }
    }
}

