/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.test;

import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.step.FlushingFaultTolerantStepBuilder;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.test.BatchTestApplication;
import com.redis.spring.batch.test.ErrorItemReader;
import io.lettuce.core.RedisCommandTimeoutException;
import java.lang.invoke.CallSite;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilderHelper;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.item.support.ListItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.PlatformTransactionManager;

@SpringBootTest(classes={BatchTestApplication.class})
@RunWith(value=SpringRunner.class)
@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
class StepTests {
    @Autowired
    protected JobRepository jobRepository;
    @Autowired
    protected PlatformTransactionManager transactionManager;
    @Autowired
    protected JobBuilderFactory jobBuilderFactory;
    @Autowired
    protected StepBuilderFactory stepBuilderFactory;
    @Autowired
    private JobLauncher jobLauncher;
    private SimpleJobLauncher asyncJobLauncher;

    StepTests() {
    }

    @BeforeAll
    void initialize() {
        this.asyncJobLauncher = new SimpleJobLauncher();
        this.asyncJobLauncher.setJobRepository(this.jobRepository);
        this.asyncJobLauncher.setTaskExecutor((TaskExecutor)new SimpleAsyncTaskExecutor());
    }

    @Test
    void flushingFaultTolerantStep() throws Exception {
        int count = 100;
        GeneratorItemReader gen = new GeneratorItemReader();
        gen.setMaxItemCount(count);
        gen.setTypes(new DataType[]{DataType.STRING});
        ErrorItemReader reader = new ErrorItemReader(gen);
        ListItemWriter writer = new ListItemWriter();
        String name = "readKeyValueFaultTolerance";
        FlushingStepBuilder step = new FlushingStepBuilder((StepBuilderHelper)this.stepBuilderFactory.get(name));
        step.chunk(1);
        step.reader(reader);
        step.writer((ItemWriter)writer);
        step.idleTimeout(Duration.ofMillis(300L));
        FlushingFaultTolerantStepBuilder ftStep = step.faultTolerant();
        ftStep.skip(RedisCommandTimeoutException.class);
        ftStep.skipPolicy((SkipPolicy)new AlwaysSkipItemSkipPolicy());
        Job job = this.jobBuilderFactory.get(name).start((Step)ftStep.build()).build();
        this.jobLauncher.run(job, new JobParameters());
        Assertions.assertEquals((float)((float)count * 0.5f), (float)writer.getWrittenItems().size());
    }

    @Test
    void readerSkipPolicy() throws Exception {
        String name = "skip-policy";
        List items = IntStream.range(0, 100).boxed().collect(Collectors.toList());
        ErrorItemReader reader = new ErrorItemReader(new ListItemReader(items));
        ListItemWriter writer = new ListItemWriter();
        SimpleStepBuilder step = this.stepBuilderFactory.get(name).chunk(1);
        step.reader(reader);
        step.writer((ItemWriter)writer);
        FlushingFaultTolerantStepBuilder ftStep = new FlushingFaultTolerantStepBuilder(step);
        ftStep.idleTimeout(Duration.ofMillis(300L));
        ftStep.skip(RedisCommandTimeoutException.class);
        ftStep.skipPolicy((SkipPolicy)new AlwaysSkipItemSkipPolicy());
        Job job = this.jobBuilderFactory.get(name).start((Step)ftStep.build()).build();
        this.jobLauncher.run(job, new JobParameters());
        Assertions.assertEquals((int)items.size(), (int)(writer.getWrittenItems().size() * 2));
    }

    @Test
    void flushingStep() throws Exception {
        String name = "flushingStep";
        int count = 100;
        LinkedBlockingDeque<CallSite> queue = new LinkedBlockingDeque<CallSite>(count);
        QueueItemReader reader = new QueueItemReader(queue, Duration.ofMillis(10L));
        ListItemWriter writer = new ListItemWriter();
        SimpleStepBuilder step = this.stepBuilderFactory.get(name).chunk(50);
        step.reader(reader);
        step.writer((ItemWriter)writer);
        FlushingStepBuilder flushingStep = new FlushingStepBuilder(step);
        flushingStep.idleTimeout(Duration.ofMillis(500L));
        Job job = this.jobBuilderFactory.get(name).start((Step)flushingStep.build()).build();
        JobExecution execution = this.asyncJobLauncher.run(job, new JobParameters());
        for (int index = 1; index <= count; ++index) {
            queue.offer((CallSite)((Object)("key" + index)));
        }
        Awaitility.await().until(() -> !execution.isRunning());
        Assertions.assertEquals((int)count, (int)writer.getWrittenItems().size());
    }

    private static class QueueItemReader<T>
    extends AbstractItemStreamItemReader<T>
    implements PollableItemReader<T> {
        private final BlockingQueue<T> queue;
        private final long timeout;

        public QueueItemReader(BlockingQueue<T> queue, Duration timeout) {
            this.queue = queue;
            this.timeout = timeout.toMillis();
        }

        public T read() throws InterruptedException {
            return this.poll(this.timeout, TimeUnit.MILLISECONDS);
        }

        public T poll(long timeout, TimeUnit unit) throws InterruptedException {
            return this.queue.poll(timeout, unit);
        }
    }
}

