/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.reader;

import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.common.JobRunner;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader;
import com.redis.spring.batch.reader.LiveReaderOptions;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.step.FlushingSimpleStepBuilder;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ItemProcessor;

public class LiveRedisItemReader<K, T extends KeyValue<K>>
extends RedisItemReader<K, T>
implements PollableItemReader<T> {
    public LiveRedisItemReader(KeyspaceNotificationItemReader<K> keyReader, ItemProcessor<List<? extends K>, List<T>> valueReader, JobRunner jobRunner, LiveReaderOptions options) {
        super(keyReader, valueReader, jobRunner, options);
    }

    @Override
    public KeyspaceNotificationItemReader<K> getKeyReader() {
        return (KeyspaceNotificationItemReader)super.getKeyReader();
    }

    @Override
    protected void doOpen() {
        super.doOpen();
        Awaitility.await().timeout(JobRunner.DEFAULT_RUNNING_TIMEOUT).until(((KeyspaceNotificationItemReader)this.keyReader)::isOpen);
    }

    @Override
    protected SimpleStepBuilder<K, K> createStep() {
        SimpleStepBuilder step = super.createStep();
        return new FlushingSimpleStepBuilder(step).options(((LiveReaderOptions)this.options).getFlushingOptions());
    }

    @Override
    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
        return (T)((KeyValue)this.queue.poll(timeout, unit));
    }
}

