/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.test;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.common.OperationValueReader;
import com.redis.spring.batch.common.Range;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.gen.StreamOptions;
import com.redis.spring.batch.reader.DumpItemReader;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.reader.StreamItemReader;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.test.SimpleTestInfo;
import com.redis.spring.batch.util.CodecUtils;
import com.redis.spring.batch.util.ConnectionUtils;
import com.redis.spring.batch.writer.OperationItemWriter;
import com.redis.spring.batch.writer.StructItemWriter;
import com.redis.spring.batch.writer.WriteOperation;
import com.redis.testcontainers.RedisServer;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.support.ConnectionPoolSupport;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.Assert;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractTestBase {
    public static final DataType[] REDIS_GENERATOR_TYPES = new DataType[]{DataType.HASH, DataType.LIST, DataType.SET, DataType.STREAM, DataType.STRING, DataType.ZSET};
    public static final DataType[] REDIS_MODULES_GENERATOR_TYPES = new DataType[]{DataType.HASH, DataType.LIST, DataType.SET, DataType.STREAM, DataType.STRING, DataType.ZSET, DataType.JSON};
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected static final int DEFAULT_CHUNK_SIZE = 50;
    private static final Duration DEFAULT_AWAIT_TIMEOUT = Duration.ofMillis(10000L);
    protected static final Duration DEFAULT_RUNNING_DELAY = Duration.ofMillis(100L);
    protected static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofMillis(3000L);
    private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofMillis(1L);
    private static final Duration DEFAULT_POLL_DELAY = Duration.ZERO;
    protected static final int DEFAULT_GENERATOR_COUNT = 100;
    private Duration pollDelay = DEFAULT_POLL_DELAY;
    private Duration pollInterval = DEFAULT_POLL_INTERVAL;
    private Duration awaitTimeout = DEFAULT_AWAIT_TIMEOUT;
    @Value(value="${running-timeout:PT5S}")
    private Duration runningTimeout;
    @Value(value="${termination-timeout:PT5S}")
    private Duration terminationTimeout;
    protected JobRepository jobRepository;
    protected PlatformTransactionManager transactionManager;
    protected AbstractRedisClient client;
    protected GenericObjectPool<StatefulConnection<String, String>> pool;
    protected StatefulRedisModulesConnection<String, String> connection;
    protected RedisModulesCommands<String, String> commands;
    private SimpleJobLauncher jobLauncher;
    private JobBuilderFactory jobBuilderFactory;
    private StepBuilderFactory stepBuilderFactory;

    public static TestInfo testInfo(TestInfo info, String ... suffixes) {
        return new SimpleTestInfo(info, suffixes);
    }

    public static <T> List<T> readAll(ItemReader<T> reader) throws Exception {
        Object element;
        ArrayList<Object> list = new ArrayList<Object>();
        while ((element = reader.read()) != null) {
            list.add(element);
        }
        return list;
    }

    protected abstract RedisServer getRedisServer();

    @BeforeAll
    void setup() throws Exception {
        this.getRedisServer().start();
        this.client = this.client(this.getRedisServer());
        this.pool = ConnectionPoolSupport.createGenericObjectPool((Supplier)ConnectionUtils.supplier((AbstractRedisClient)this.client), (GenericObjectPoolConfig)new GenericObjectPoolConfig());
        this.connection = RedisModulesUtils.connection((AbstractRedisClient)this.client);
        this.commands = this.connection.sync();
        MapJobRepositoryFactoryBean bean = new MapJobRepositoryFactoryBean();
        bean.afterPropertiesSet();
        this.jobRepository = bean.getObject();
        this.transactionManager = bean.getTransactionManager();
        this.jobLauncher = new SimpleJobLauncher();
        this.jobLauncher.setJobRepository(this.jobRepository);
        this.jobLauncher.setTaskExecutor((TaskExecutor)new SyncTaskExecutor());
        this.jobLauncher.afterPropertiesSet();
        this.jobBuilderFactory = new JobBuilderFactory(this.jobRepository);
        this.stepBuilderFactory = new StepBuilderFactory(this.jobRepository, bean.getTransactionManager());
    }

    @AfterAll
    void teardown() {
        this.connection.close();
        this.pool.close();
        this.client.shutdown();
        this.client.getResources().shutdown();
        this.getRedisServer().close();
    }

    @BeforeEach
    void flushAll() {
        this.commands.flushall();
    }

    protected GeneratorItemReader generator(int count) {
        return this.generator(count, this.generatorDataTypes());
    }

    protected abstract DataType[] generatorDataTypes();

    protected GeneratorItemReader generator(DataType ... types) {
        return this.generator(100, types);
    }

    protected GeneratorItemReader generator(int count, DataType ... types) {
        GeneratorItemReader gen = new GeneratorItemReader();
        gen.setMaxItemCount(count);
        gen.setTypes(types);
        return gen;
    }

    protected <I, O> SimpleStepBuilder<I, O> step(TestInfo info, ItemReader<I> reader, ItemWriter<O> writer) {
        return this.step(info, reader, null, writer);
    }

    protected <I, O> SimpleStepBuilder<I, O> step(TestInfo info, ItemReader<I> reader, ItemProcessor<I, O> processor, ItemWriter<O> writer) {
        return this.step(info, 50, reader, processor, writer);
    }

    protected <I, O> SimpleStepBuilder<I, O> step(TestInfo info, int chunkSize, ItemReader<I> reader, ItemProcessor<I, O> processor, ItemWriter<O> writer) {
        String name = AbstractTestBase.name(info);
        if (reader instanceof ItemStreamSupport) {
            ((ItemStreamSupport)reader).setName(name + "-reader");
        }
        SimpleStepBuilder step = this.stepBuilderFactory.get(name).chunk(chunkSize);
        step.reader(reader);
        step.processor(processor);
        step.writer(writer);
        return step;
    }

    public static String name(TestInfo info) {
        return info.getDisplayName().replace("(TestInfo)", "");
    }

    protected AbstractRedisClient client(RedisServer server) {
        if (server.isCluster()) {
            return RedisModulesClusterClient.create((String)server.getRedisURI());
        }
        return RedisModulesClient.create((String)server.getRedisURI());
    }

    public void awaitRunning(JobExecution jobExecution) {
        this.awaitUntil(() -> ((JobExecution)jobExecution).isRunning());
    }

    public void awaitTermination(JobExecution jobExecution) {
        this.awaitUntilFalse(() -> ((JobExecution)jobExecution).isRunning());
    }

    protected void awaitUntilFalse(Callable<Boolean> evaluator) {
        this.awaitUntil(() -> (Boolean)evaluator.call() == false);
    }

    protected void awaitUntil(Callable<Boolean> evaluator) {
        Awaitility.await().pollDelay(this.pollDelay).pollInterval(this.pollInterval).timeout(this.awaitTimeout).until(evaluator);
    }

    protected JobBuilder job(TestInfo info) {
        return this.jobBuilderFactory.get(AbstractTestBase.name(info));
    }

    protected GeneratorItemReader generator() {
        return this.generator(100);
    }

    protected void generate(TestInfo info) throws JobExecutionException {
        this.generate(info, this.generator(100));
    }

    protected void generate(TestInfo info, GeneratorItemReader reader) throws JobExecutionException {
        this.generate(info, this.client, reader);
    }

    protected void generate(TestInfo info, AbstractRedisClient client, GeneratorItemReader reader) throws JobExecutionException {
        TestInfo finalTestInfo = AbstractTestBase.testInfo(info, "generate", String.valueOf(client.hashCode()));
        StructItemWriter writer = RedisItemWriter.struct((AbstractRedisClient)client, (RedisCodec)CodecUtils.STRING_CODEC);
        this.run(finalTestInfo, (ItemReader)reader, (ItemWriter)writer);
        this.awaitUntilFalse(() -> ((GeneratorItemReader)reader).isOpen());
        this.awaitUntilFalse(() -> ((StructItemWriter)writer).isOpen());
    }

    protected void configureReader(TestInfo info, RedisItemReader<?, ?, ?> reader) {
        reader.setName(AbstractTestBase.name(info));
        reader.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
        reader.setJobRepository(this.jobRepository);
        reader.setTransactionManager(this.transactionManager);
    }

    protected void flushAll(AbstractRedisClient client) {
        try (StatefulRedisModulesConnection connection = RedisModulesUtils.connection((AbstractRedisClient)client);){
            this.commands.flushall();
            this.awaitUntil(() -> this.commands.dbsize() == 0L);
        }
    }

    protected <I, O> JobExecution run(TestInfo info, ItemReader<I> reader, ItemWriter<O> writer) throws JobExecutionException {
        return this.run(info, reader, null, writer);
    }

    protected <I, O> JobExecution run(TestInfo info, ItemReader<I> reader, ItemProcessor<I, O> processor, ItemWriter<O> writer) throws JobExecutionException {
        return this.run(info, this.step(info, reader, processor, writer));
    }

    protected <I, O> JobExecution run(TestInfo info, SimpleStepBuilder<I, O> step) throws JobExecutionException {
        SimpleJobBuilder job = this.job(info).start((Step)this.faultTolerant(step).build());
        return this.run(job.build());
    }

    protected <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> step) {
        return step.faultTolerant().retryPolicy((RetryPolicy)new MaxAttemptsRetryPolicy()).retry(RedisCommandTimeoutException.class);
    }

    protected JobExecution run(Job job) throws JobExecutionException {
        return this.jobLauncher.run(job, new JobParameters());
    }

    protected void enableKeyspaceNotifications(AbstractRedisClient client) {
        this.commands.configSet("notify-keyspace-events", "AK");
    }

    protected <I, O> FlushingStepBuilder<I, O> flushingStep(TestInfo info, PollableItemReader<I> reader, ItemWriter<O> writer) {
        return new FlushingStepBuilder(this.step(info, (ItemReader<I>)reader, writer)).idleTimeout(DEFAULT_IDLE_TIMEOUT);
    }

    protected void generateStreams(TestInfo info, int messageCount) throws JobExecutionException {
        GeneratorItemReader gen = this.generator(3, DataType.STREAM);
        StreamOptions streamOptions = new StreamOptions();
        streamOptions.setMessageCount(Range.of((int)messageCount));
        gen.setStreamOptions(streamOptions);
        this.generate(AbstractTestBase.testInfo(info, "streams"), gen);
    }

    protected StreamItemReader<String, String> streamReader(String stream, Consumer<String> consumer) {
        return new StreamItemReader(this.client, (RedisCodec)CodecUtils.STRING_CODEC, (Object)stream, consumer);
    }

    protected void assertMessageBody(List<? extends StreamMessage<String, String>> items) {
        for (StreamMessage<String, String> streamMessage : items) {
            Assertions.assertTrue((boolean)streamMessage.getBody().containsKey("field1"));
            Assertions.assertTrue((boolean)streamMessage.getBody().containsKey("field2"));
        }
    }

    protected void assertStreamEquals(String expectedId, Map<String, String> expectedBody, String expectedStream, StreamMessage<String, String> message) {
        Assertions.assertEquals((Object)expectedId, (Object)message.getId());
        Assertions.assertEquals(expectedBody, (Object)message.getBody());
        Assertions.assertEquals((Object)expectedStream, (Object)message.getStream());
    }

    protected Map<String, String> map(String ... args) {
        Assert.notNull((Object)args, (String)"Args cannot be null");
        Assert.isTrue((args.length % 2 == 0 ? 1 : 0) != 0, (String)"Args length is not a multiple of 2");
        LinkedHashMap<String, String> body = new LinkedHashMap<String, String>();
        for (int index = 0; index < args.length / 2; ++index) {
            body.put(args[index * 2], args[index * 2 + 1]);
        }
        return body;
    }

    protected byte[] toByteArray(String key) {
        return (byte[])CodecUtils.toByteArrayKeyFunction((RedisCodec)CodecUtils.STRING_CODEC).apply(key);
    }

    protected String toString(byte[] key) {
        return (String)CodecUtils.toStringKeyFunction((RedisCodec)ByteArrayCodec.INSTANCE).apply(key);
    }

    protected void open(ItemStream itemStream) {
        itemStream.open(new ExecutionContext());
    }

    protected OperationValueReader<byte[], byte[], byte[], KeyValue<byte[]>> dumpOperationExecutor() {
        DumpItemReader reader = RedisItemReader.dump((AbstractRedisClient)this.client);
        OperationValueReader executor = reader.operationValueReader();
        executor.open(new ExecutionContext());
        return executor;
    }

    protected OperationValueReader<String, String, String, KeyValue<String>> structOperationExecutor() {
        return this.structOperationExecutor((RedisCodec)CodecUtils.STRING_CODEC);
    }

    protected <K, V> OperationValueReader<K, V, K, KeyValue<K>> structOperationExecutor(RedisCodec<K, V> codec) {
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client, codec);
        OperationValueReader executor = reader.operationValueReader();
        executor.open(new ExecutionContext());
        return executor;
    }

    protected <T> OperationItemWriter<String, String, T> writer(WriteOperation<String, String, T> operation) {
        return RedisItemWriter.operation((AbstractRedisClient)this.client, (RedisCodec)CodecUtils.STRING_CODEC, operation);
    }
}

