/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch;

import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.reader.DumpItemReader;
import com.redis.spring.batch.reader.KeyItemReader;
import com.redis.spring.batch.reader.KeyScanItemReader;
import com.redis.spring.batch.reader.KeyTypeItemReader;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.step.FlushingChunkProvider;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.util.Await;
import com.redis.spring.batch.util.CodecUtils;
import com.redis.spring.batch.writer.ProcessingItemWriter;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.codec.RedisCodec;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ClassUtils;

public abstract class RedisItemReader<K, V, T>
extends AbstractItemStreamItemReader<T>
implements PollableItemReader<T> {
    private final Log log = LogFactory.getLog(this.getClass());
    public static final String QUEUE_METER = "redis.batch.reader.queue.size";
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    public static final Duration DEFAULT_FLUSH_INTERVAL = FlushingChunkProvider.DEFAULT_FLUSH_INTERVAL;
    public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(10L);
    public static final KeyspaceNotificationItemReader.OrderingStrategy DEFAULT_ORDERING = KeyspaceNotificationItemReader.OrderingStrategy.PRIORITY;
    public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = 10000;
    public static final ReaderMode DEFAULT_MODE = ReaderMode.SCAN;
    public static final int DEFAULT_SKIP_LIMIT = 0;
    public static final int DEFAULT_RETRY_LIMIT = 3;
    private static final Duration DEFAULT_OPEN_TIMEOUT = Duration.ofSeconds(3L);
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(3L);
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private ReaderMode mode = DEFAULT_MODE;
    private int skipLimit = 0;
    private int retryLimit = 3;
    private List<Class<? extends Throwable>> skippableExceptions = RedisItemReader.defaultNonRetriableExceptions();
    private List<Class<? extends Throwable>> nonSkippableExceptions = RedisItemReader.defaultRetriableExceptions();
    private List<Class<? extends Throwable>> retriableExceptions = RedisItemReader.defaultRetriableExceptions();
    private List<Class<? extends Throwable>> nonRetriableExceptions = RedisItemReader.defaultNonRetriableExceptions();
    private int database;
    private KeyspaceNotificationItemReader.OrderingStrategy orderingStrategy = DEFAULT_ORDERING;
    private int notificationQueueCapacity = 10000;
    private Duration flushInterval = DEFAULT_FLUSH_INTERVAL;
    private Duration idleTimeout;
    private long scanCount;
    private ItemProcessor<K, K> keyProcessor;
    private ReadFrom readFrom;
    private int threads = 1;
    private int chunkSize = 50;
    private int queueCapacity = 10000;
    private String keyPattern;
    private DataType keyType;
    private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;
    private Duration openTimeout = DEFAULT_OPEN_TIMEOUT;
    private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT;
    private JobRepository jobRepository;
    private PlatformTransactionManager transactionManager;
    private JobBuilderFactory jobBuilderFactory;
    private StepBuilderFactory stepBuilderFactory;
    private SimpleJobLauncher jobLauncher;
    private String name;
    private JobExecution jobExecution;
    private KeyItemReader<K> keyReader;
    private ProcessingItemWriter<K, T> writer;

    protected RedisItemReader(AbstractRedisClient client, RedisCodec<K, V> codec) {
        this.setName(ClassUtils.getShortName(this.getClass()));
        this.client = client;
        this.codec = codec;
    }

    public AbstractRedisClient getClient() {
        return this.client;
    }

    public RedisCodec<K, V> getCodec() {
        return this.codec;
    }

    public ReaderMode getMode() {
        return this.mode;
    }

    public void addSkippableException(Class<? extends Throwable> exception) {
        this.skippableExceptions.add(exception);
    }

    public void addNonSkippableException(Class<? extends Throwable> exception) {
        this.nonSkippableExceptions.add(exception);
    }

    public void addRetriableException(Class<? extends Throwable> exception) {
        this.retriableExceptions.add(exception);
    }

    public void addNonRetriableException(Class<? extends Throwable> exception) {
        this.nonRetriableExceptions.add(exception);
    }

    public void setRetryLimit(int retryLimit) {
        this.retryLimit = retryLimit;
    }

    public void setSkipLimit(int skipLimit) {
        this.skipLimit = skipLimit;
    }

    public void setScanCount(long count) {
        this.scanCount = count;
    }

    public void setCloseTimeout(Duration closeTimeout) {
        this.closeTimeout = closeTimeout;
    }

    public void setOpenTimeout(Duration timeout) {
        this.openTimeout = timeout;
    }

    public void setJobRepository(JobRepository repository) {
        this.jobRepository = repository;
    }

    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    public ItemProcessor<K, K> getKeyProcessor() {
        return this.keyProcessor;
    }

    public void setKeyProcessor(ItemProcessor<K, K> processor) {
        this.keyProcessor = processor;
    }

    public void setThreads(int threads) {
        this.threads = threads;
    }

    public void setChunkSize(int size) {
        this.chunkSize = size;
    }

    public void setQueueCapacity(int capacity) {
        this.queueCapacity = capacity;
    }

    public void setMode(ReaderMode mode) {
        this.mode = mode;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public void setKeyPattern(String globPattern) {
        this.keyPattern = globPattern;
    }

    public void setKeyType(DataType type) {
        this.keyType = type;
    }

    public void setPollTimeout(Duration timeout) {
        this.pollTimeout = timeout;
    }

    public void setIdleTimeout(Duration timeout) {
        this.idleTimeout = timeout;
    }

    public List<Class<? extends Throwable>> getRetriableExceptions() {
        return this.retriableExceptions;
    }

    public void setRetriableExceptions(List<Class<? extends Throwable>> retriableExceptions) {
        this.retriableExceptions = retriableExceptions;
    }

    public List<Class<? extends Throwable>> getNonRetriableExceptions() {
        return this.nonRetriableExceptions;
    }

    public void setNonRetriableExceptions(List<Class<? extends Throwable>> nonRetriableExceptions) {
        this.nonRetriableExceptions = nonRetriableExceptions;
    }

    public int getRetryLimit() {
        return this.retryLimit;
    }

    public int getSkipLimit() {
        return this.skipLimit;
    }

    public int getDatabase() {
        return this.database;
    }

    public KeyspaceNotificationItemReader.OrderingStrategy getOrderingStrategy() {
        return this.orderingStrategy;
    }

    public int getNotificationQueueCapacity() {
        return this.notificationQueueCapacity;
    }

    public Duration getFlushInterval() {
        return this.flushInterval;
    }

    public Duration getIdleTimeout() {
        return this.idleTimeout;
    }

    public long getScanCount() {
        return this.scanCount;
    }

    public ReadFrom getReadFrom() {
        return this.readFrom;
    }

    public int getThreads() {
        return this.threads;
    }

    public int getChunkSize() {
        return this.chunkSize;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public String getKeyPattern() {
        return this.keyPattern;
    }

    public DataType getKeyType() {
        return this.keyType;
    }

    public Duration getPollTimeout() {
        return this.pollTimeout;
    }

    public Duration getOpenTimeout() {
        return this.openTimeout;
    }

    public void setFlushInterval(Duration interval) {
        this.flushInterval = interval;
    }

    public void setNotificationQueueCapacity(int capacity) {
        this.notificationQueueCapacity = capacity;
    }

    public void setDatabase(int database) {
        this.database = database;
    }

    public void setOrderingStrategy(KeyspaceNotificationItemReader.OrderingStrategy strategy) {
        this.orderingStrategy = strategy;
    }

    public void setName(String name) {
        super.setName(name);
        this.name = name;
    }

    public ItemReader<K> getKeyReader() {
        return this.keyReader;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (!this.isOpen()) {
            this.log.debug((Object)String.format("Opening %s", this.name));
            this.initializeJobInfrastructure();
            Job job = this.jobBuilderFactory.get(this.name).start((Step)this.step().build()).build();
            this.jobExecution = this.execute(job);
            this.log.debug((Object)String.format("Opened %s", this.name));
        }
    }

    private JobExecution execute(Job job) {
        boolean executed;
        JobExecution execution;
        try {
            execution = this.jobLauncher.run(job, new JobParameters());
        }
        catch (JobExecutionException e) {
            throw new ItemStreamException("Job execution failed", (Throwable)e);
        }
        Await await = new Await();
        try {
            executed = await.await(() -> this.isRunning(execution) || this.isCompleted(execution), this.openTimeout);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ItemStreamException("Interruped while waiting for job to start", (Throwable)e);
        }
        if (!executed) {
            throw new ItemStreamException("Timeout waiting for job to run");
        }
        if (execution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode())) {
            if (execution.getAllFailureExceptions().isEmpty()) {
                throw new ItemStreamException("Could not run job");
            }
            throw new ItemStreamException("Could not run job", (Throwable)execution.getAllFailureExceptions().get(0));
        }
        return execution;
    }

    private boolean isCompleted(JobExecution execution) {
        return execution.getStatus().isUnsuccessful() || execution.getStatus().equals((Object)BatchStatus.COMPLETED);
    }

    private boolean isRunning(JobExecution execution) {
        return execution.isRunning() && this.keyReader.isOpen() && this.writer.isOpen();
    }

    public synchronized void close() {
        super.close();
        if (this.isOpen()) {
            this.log.debug((Object)String.format("Closing %s", this.name));
            if (this.jobExecution.isRunning()) {
                boolean executed;
                Await await = new Await();
                try {
                    executed = await.await(() -> !this.isRunning(this.jobExecution), this.closeTimeout);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new ItemStreamException("Interruped while waiting for job to start", (Throwable)e);
                }
                if (!executed) {
                    this.log.debug((Object)"Terminating step executions");
                    this.jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly);
                }
            }
            this.jobExecution = null;
            this.log.debug((Object)String.format("Closed %s", this.name));
        }
    }

    public synchronized T read() throws InterruptedException {
        T item;
        while ((item = this.poll(this.pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) == null && this.writer.isOpen()) {
        }
        return item;
    }

    public synchronized List<T> read(int count) {
        ArrayList items = new ArrayList(count);
        this.writer.getQueue().drainTo(items);
        return items;
    }

    @Override
    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
        return this.writer.getQueue().poll(timeout, unit);
    }

    private void initializeJobInfrastructure() {
        if (this.jobRepository == null || this.transactionManager == null) {
            MapJobRepositoryFactoryBean bean = new MapJobRepositoryFactoryBean();
            if (this.jobRepository == null) {
                try {
                    this.jobRepository = bean.getObject();
                }
                catch (Exception e) {
                    throw new ItemStreamException("Could not initialize job repository");
                }
                if (this.jobRepository == null) {
                    throw new ItemStreamException("Job repository is null");
                }
            }
            if (this.transactionManager == null) {
                this.transactionManager = bean.getTransactionManager();
            }
        }
        this.jobBuilderFactory = new JobBuilderFactory(this.jobRepository);
        this.stepBuilderFactory = new StepBuilderFactory(this.jobRepository, this.transactionManager);
        this.jobLauncher = new SimpleJobLauncher();
        this.jobLauncher.setJobRepository(this.jobRepository);
        this.jobLauncher.setTaskExecutor((TaskExecutor)new SimpleAsyncTaskExecutor());
    }

    private SimpleStepBuilder<K, K> step() {
        FaultTolerantStepBuilder step = this.simpleStep().faultTolerant();
        this.keyReader = this.keyReader();
        step.reader(this.keyReader);
        step.processor(this.keyProcessor);
        this.writer = this.writer();
        step.writer(this.writer);
        if (this.threads > 1) {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setMaxPoolSize(this.threads);
            taskExecutor.setCorePoolSize(this.threads);
            taskExecutor.setQueueCapacity(this.threads);
            taskExecutor.afterPropertiesSet();
            step.taskExecutor((TaskExecutor)taskExecutor);
            step.throttleLimit(this.threads);
        }
        step.skipLimit(this.skipLimit);
        step.retryLimit(this.retryLimit);
        this.skippableExceptions.forEach(arg_0 -> ((FaultTolerantStepBuilder)step).skip(arg_0));
        this.nonSkippableExceptions.forEach(arg_0 -> ((FaultTolerantStepBuilder)step).noSkip(arg_0));
        this.retriableExceptions.forEach(arg_0 -> ((FaultTolerantStepBuilder)step).retry(arg_0));
        this.nonRetriableExceptions.forEach(arg_0 -> ((FaultTolerantStepBuilder)step).noRetry(arg_0));
        return step;
    }

    private SimpleStepBuilder<K, K> simpleStep() {
        SimpleStepBuilder step = this.stepBuilderFactory.get(this.name).chunk(this.chunkSize);
        if (this.isLive()) {
            FlushingStepBuilder flushingStep = new FlushingStepBuilder(step);
            flushingStep.interval(this.flushInterval);
            flushingStep.idleTimeout(this.idleTimeout);
            return flushingStep;
        }
        return step;
    }

    private ProcessingItemWriter<K, T> writer() {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(this.queueCapacity);
        Metrics.globalRegistry.gaugeCollectionSize(QUEUE_METER, Collections.emptyList(), queue);
        ProcessingItemWriter<K, T> processingWriter = new ProcessingItemWriter<K, T>(this.valueReader(), queue);
        processingWriter.setName(this.name("writer"));
        return processingWriter;
    }

    protected abstract ItemProcessor<List<K>, List<T>> valueReader();

    private KeyItemReader<K> keyReader() {
        if (this.isLive()) {
            return this.keyspaceNotificationReader();
        }
        return this.scanKeyReader();
    }

    private KeyspaceNotificationItemReader<K> keyspaceNotificationReader() {
        KeyspaceNotificationItemReader<K> reader = new KeyspaceNotificationItemReader<K>(this.client, this.codec);
        reader.setName(this.name("keyspaceNotificationReader"));
        reader.setDatabase(this.database);
        reader.setKeyPattern(this.keyPattern);
        reader.setKeyType(this.keyType);
        reader.setOrderingStrategy(this.orderingStrategy);
        reader.setQueueCapacity(this.notificationQueueCapacity);
        reader.setPollTimeout(this.pollTimeout);
        return reader;
    }

    private String name(String ... suffixes) {
        ArrayList<String> elements = new ArrayList<String>();
        elements.add(this.name);
        elements.addAll(Arrays.asList(suffixes));
        return String.join((CharSequence)"-", elements);
    }

    public KeyScanItemReader<K> scanKeyReader() {
        KeyScanItemReader<K> reader = new KeyScanItemReader<K>(this.client, this.codec);
        reader.setName(this.name("keyScanReader"));
        reader.setReadFrom(this.readFrom);
        reader.setLimit(this.scanCount);
        reader.setMatch(this.keyPattern);
        reader.setType(this.keyType == null ? null : this.keyType.getString());
        return reader;
    }

    public boolean isLive() {
        return this.mode == ReaderMode.LIVE;
    }

    public synchronized boolean isOpen() {
        return this.jobExecution != null;
    }

    public static DumpItemReader dump(AbstractRedisClient client) {
        return new DumpItemReader(client);
    }

    public static StructItemReader<String, String> struct(AbstractRedisClient client) {
        return RedisItemReader.struct(client, CodecUtils.STRING_CODEC);
    }

    public static <K, V> StructItemReader<K, V> struct(AbstractRedisClient client, RedisCodec<K, V> codec) {
        return new StructItemReader<K, V>(client, codec);
    }

    public static List<Class<? extends Throwable>> defaultRetriableExceptions() {
        return RedisItemReader.modifiableList(RedisCommandTimeoutException.class);
    }

    public static List<Class<? extends Throwable>> defaultNonRetriableExceptions() {
        return RedisItemReader.modifiableList(RedisCommandExecutionException.class);
    }

    private static <T> List<T> modifiableList(T ... elements) {
        return new ArrayList<T>(Arrays.asList(elements));
    }

    public static KeyTypeItemReader<String, String> type(AbstractRedisClient client) {
        return RedisItemReader.type(client, CodecUtils.STRING_CODEC);
    }

    public static <K, V> KeyTypeItemReader<K, V> type(AbstractRedisClient client, RedisCodec<K, V> codec) {
        return new KeyTypeItemReader<K, V>(client, codec);
    }

    public static enum ReaderMode {
        SCAN,
        LIVE;

    }
}

