/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.test;

import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.common.KeyComparison;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.common.OperationValueReader;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.reader.DumpItemReader;
import com.redis.spring.batch.reader.StreamItemReader;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.test.ModulesTests;
import com.redis.spring.batch.test.RedisContainerFactory;
import com.redis.spring.batch.test.SimpleTestInfo;
import com.redis.spring.batch.writer.DumpItemWriter;
import com.redis.spring.batch.writer.OperationItemWriter;
import com.redis.spring.batch.writer.StructItemWriter;
import com.redis.spring.batch.writer.operation.Xadd;
import com.redis.testcontainers.RedisServer;
import com.redis.testcontainers.RedisStackContainer;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.Consumer;
import io.lettuce.core.KeyScanArgs;
import io.lettuce.core.Range;
import io.lettuce.core.ScanArgs;
import io.lettuce.core.ScanIterator;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.api.sync.RedisKeyCommands;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.item.support.ListItemWriter;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.util.unit.DataSize;

class StackToStackTests
extends ModulesTests {
    private static final RedisStackContainer SOURCE = RedisContainerFactory.stack();
    private static final RedisStackContainer TARGET = RedisContainerFactory.stack();

    StackToStackTests() {
    }

    @Override
    protected RedisServer getRedisServer() {
        return SOURCE;
    }

    @Override
    protected RedisServer getTargetRedisServer() {
        return TARGET;
    }

    @Test
    void readStructMemoryUsage(TestInfo info) throws Exception {
        this.generate(info);
        long memLimit = 200L;
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        reader.setMemoryUsageLimit(DataSize.ofBytes((long)memLimit));
        reader.open(new ExecutionContext());
        List<KeyValue> keyValues = StackToStackTests.readAll(reader);
        reader.close();
        Assertions.assertFalse((boolean)keyValues.isEmpty());
        for (KeyValue keyValue : keyValues) {
            Assertions.assertTrue((keyValue.getMemoryUsage() > 0L ? 1 : 0) != 0);
            if (keyValue.getMemoryUsage() <= memLimit) continue;
            Assertions.assertNull((Object)keyValue.getValue());
        }
    }

    @Test
    void readStructMemoryUsageTTL(TestInfo info) throws Exception {
        String key = "myhash";
        HashMap<String, String> hash = new HashMap<String, String>();
        hash.put("field1", "value1");
        hash.put("field2", "value2");
        this.commands.hset((Object)key, hash);
        long ttl = System.currentTimeMillis() + 123456L;
        this.commands.pexpireat((Object)key, ttl);
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        reader.setMemoryUsageLimit(DataSize.ofBytes((long)-1L));
        OperationValueReader executor = reader.operationValueReader();
        executor.open(new ExecutionContext());
        KeyValue ds = (KeyValue)executor.process(Arrays.asList(key)).get(0);
        Assertions.assertEquals((Object)key, (Object)ds.getKey());
        Assertions.assertEquals((long)ttl, (long)ds.getTtl());
        Assertions.assertEquals((Object)DataType.HASH, (Object)ds.getType());
        Assertions.assertTrue((ds.getMemoryUsage() > 0L ? 1 : 0) != 0);
        executor.close();
    }

    @Test
    void readStructMemLimit(TestInfo info) throws Exception {
        DataSize limit = DataSize.ofBytes((long)500L);
        String key1 = "key:1";
        this.commands.set((Object)key1, (Object)"bar");
        String key2 = "key:2";
        this.commands.set((Object)key2, (Object)GeneratorItemReader.string((long)Math.toIntExact(limit.toBytes() * 2L)));
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        reader.setName(StackToStackTests.name(info) + "-reader");
        reader.setMemoryUsageLimit(limit);
        reader.open(new ExecutionContext());
        List keyValues = StackToStackTests.readAll(reader);
        reader.close();
        Map<String, KeyValue> map = keyValues.stream().collect(Collectors.toMap(s -> (String)s.getKey(), s -> s));
        Assertions.assertNull((Object)map.get(key2).getValue());
    }

    @Test
    void replicateStructMemLimit(TestInfo info) throws Exception {
        this.generate(info);
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        reader.setMemoryUsageLimit(DataSize.ofMegabytes((long)100L));
        StructItemWriter writer = RedisItemWriter.struct((AbstractRedisClient)this.targetClient);
        List<KeyComparison> diffs = this.replicate(info, reader, writer);
        this.assertEmpty(diffs);
    }

    @Test
    void replicateDumpMemLimitHigh(TestInfo info) throws Exception {
        this.generate(info);
        DumpItemReader reader = RedisItemReader.dump((AbstractRedisClient)this.client);
        reader.setMemoryUsageLimit(DataSize.ofMegabytes((long)100L));
        DumpItemWriter writer = RedisItemWriter.dump((AbstractRedisClient)this.targetClient);
        this.assertEmpty(this.replicate(info, reader, writer));
    }

    @Test
    void replicateDumpMemLimitLow(TestInfo info) throws Exception {
        this.generate(info);
        Assertions.assertTrue((this.commands.dbsize() > 10L ? 1 : 0) != 0);
        long memLimit = 1500L;
        DumpItemReader reader = RedisItemReader.dump((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        reader.setMemoryUsageLimit(DataSize.ofBytes((long)memLimit));
        DumpItemWriter writer = RedisItemWriter.dump((AbstractRedisClient)this.targetClient);
        this.run(info, reader, writer);
        this.awaitUntilFalse(() -> ((DumpItemReader)reader).isOpen());
        this.awaitUntilFalse(() -> ((DumpItemWriter)writer).isOpen());
        StructItemReader fullReader = RedisItemReader.struct((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        fullReader.setName(StackToStackTests.name(info) + "-fullReader");
        fullReader.setJobRepository(this.jobRepository);
        fullReader.setTransactionManager(this.transactionManager);
        fullReader.setMemoryUsageLimit(DataSize.ofBytes((long)-1L));
        fullReader.open(new ExecutionContext());
        List items = StackToStackTests.readAll(fullReader);
        fullReader.close();
        Predicate<KeyValue> isMemKey = v -> v.getMemoryUsage() > memLimit;
        List bigkeys = items.stream().filter(isMemKey).collect(Collectors.toList());
        Assertions.assertEquals((Long)this.commands.dbsize(), (long)((long)bigkeys.size() + this.targetCommands.dbsize()));
    }

    @Test
    void writeStructMultiExec(TestInfo info) throws Exception {
        int count = 10;
        GeneratorItemReader reader = this.generator(count);
        StructItemWriter writer = RedisItemWriter.struct((AbstractRedisClient)this.client);
        writer.setMultiExec(true);
        SimpleStepBuilder step = this.step(info, 1, reader, null, writer);
        this.run(info, step);
        Assertions.assertEquals((long)count, (Long)this.commands.dbsize());
    }

    @Test
    void writeStreamMultiExec(TestInfo testInfo) throws Exception {
        String stream = "stream:1";
        ArrayList messages = new ArrayList();
        for (int index = 0; index < 100; ++index) {
            HashMap<String, String> body = new HashMap<String, String>();
            body.put("field1", "value1");
            body.put("field2", "value2");
            messages.add(body);
        }
        ListItemReader reader = new ListItemReader(messages);
        Xadd xadd = new Xadd();
        xadd.setKey((Object)stream);
        xadd.setBodyFunction(Function.identity());
        OperationItemWriter writer = this.writer(xadd);
        writer.setMultiExec(true);
        this.run(testInfo, reader, writer);
        Assertions.assertEquals((long)messages.size(), (Long)this.commands.xlen((Object)stream));
        List xrange = this.commands.xrange((Object)stream, Range.create((Object)"-", (Object)"+"));
        for (int index = 0; index < xrange.size(); ++index) {
            StreamMessage message = (StreamMessage)xrange.get(index);
            Assertions.assertEquals(messages.get(index), (Object)message.getBody());
        }
    }

    @Test
    void readMultipleStreams(TestInfo testInfo) throws Exception {
        String consumerGroup = "consumerGroup";
        this.generateStreams(StackToStackTests.testInfo(testInfo, "streams"), 277);
        KeyScanArgs args = KeyScanArgs.Builder.type((String)DataType.STREAM.getString());
        List keys = ScanIterator.scan((RedisKeyCommands)this.commands, (ScanArgs)args).stream().collect(Collectors.toList());
        for (String key : keys) {
            long count = this.commands.xlen((Object)key);
            StreamItemReader<String, String> reader1 = this.streamReader(key, (Consumer<String>)Consumer.from((Object)consumerGroup, (Object)"consumer1"));
            reader1.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
            StreamItemReader<String, String> reader2 = this.streamReader(key, (Consumer<String>)Consumer.from((Object)consumerGroup, (Object)"consumer2"));
            reader2.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
            ListItemWriter writer1 = new ListItemWriter();
            SimpleTestInfo testInfo1 = new SimpleTestInfo(testInfo, key, "1");
            TaskletStep step1 = this.faultTolerant(this.flushingStep(testInfo1, reader1, writer1)).build();
            SimpleTestInfo testInfo2 = new SimpleTestInfo(testInfo, key, "2");
            ListItemWriter writer2 = new ListItemWriter();
            TaskletStep step2 = this.faultTolerant(this.flushingStep(testInfo2, reader2, writer2)).build();
            SimpleFlow flow1 = (SimpleFlow)StackToStackTests.flow("flow1").start((Step)step1).build();
            SimpleFlow flow2 = (SimpleFlow)StackToStackTests.flow("flow2").start((Step)step2).build();
            SimpleFlow flow = (SimpleFlow)StackToStackTests.flow("replicate").split((TaskExecutor)new SimpleAsyncTaskExecutor()).add(new Flow[]{flow1, flow2}).build();
            this.run(this.job(testInfo1).start((Flow)flow).build().build());
            this.awaitUntilFalse(() -> reader1.isOpen());
            this.awaitUntilFalse(() -> reader2.isOpen());
            Assertions.assertEquals((long)count, (long)(writer1.getWrittenItems().size() + writer2.getWrittenItems().size()));
            this.assertMessageBody(writer1.getWrittenItems());
            this.assertMessageBody(writer2.getWrittenItems());
            Assertions.assertEquals((long)count, (long)this.commands.xpending((Object)key, (Object)consumerGroup).getCount());
            reader1 = this.streamReader(key, (Consumer<String>)Consumer.from((Object)consumerGroup, (Object)"consumer1"));
            reader1.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
            reader1.open(new ExecutionContext());
            reader1.ack((Iterable)writer1.getWrittenItems());
            reader1.close();
            reader2 = this.streamReader(key, (Consumer<String>)Consumer.from((Object)consumerGroup, (Object)"consumer2"));
            reader2.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
            reader2.open(new ExecutionContext());
            reader2.ack((Iterable)writer2.getWrittenItems());
            reader2.close();
            Assertions.assertEquals((long)0L, (long)this.commands.xpending((Object)key, (Object)consumerGroup).getCount());
        }
    }

    private static FlowBuilder<SimpleFlow> flow(String name) {
        return new FlowBuilder(name);
    }
}

