/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.test;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.common.KeyComparison;
import com.redis.spring.batch.common.KeyComparisonItemReader;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.common.OperationValueReader;
import com.redis.spring.batch.common.ToGeoValueFunction;
import com.redis.spring.batch.common.ToScoredValueFunction;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.gen.MapOptions;
import com.redis.spring.batch.reader.KeyTypeItemReader;
import com.redis.spring.batch.reader.KeyValueItemReader;
import com.redis.spring.batch.reader.ScanSizeEstimator;
import com.redis.spring.batch.reader.StreamItemReader;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.test.AbstractTargetTestBase;
import com.redis.spring.batch.test.BatchTestApplication;
import com.redis.spring.batch.test.Geo;
import com.redis.spring.batch.test.SynchronizedListItemWriter;
import com.redis.spring.batch.test.ZValue;
import com.redis.spring.batch.writer.OperationItemWriter;
import com.redis.spring.batch.writer.StructItemWriter;
import com.redis.spring.batch.writer.operation.Del;
import com.redis.spring.batch.writer.operation.Expire;
import com.redis.spring.batch.writer.operation.ExpireAt;
import com.redis.spring.batch.writer.operation.Geoadd;
import com.redis.spring.batch.writer.operation.Hset;
import com.redis.spring.batch.writer.operation.Lpush;
import com.redis.spring.batch.writer.operation.LpushAll;
import com.redis.spring.batch.writer.operation.Rpush;
import com.redis.spring.batch.writer.operation.Sadd;
import com.redis.spring.batch.writer.operation.Xadd;
import com.redis.spring.batch.writer.operation.Zadd;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.Consumer;
import io.lettuce.core.GeoArgs;
import io.lettuce.core.KeyScanArgs;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.Range;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RestoreArgs;
import io.lettuce.core.ScanArgs;
import io.lettuce.core.ScanIterator;
import io.lettuce.core.ScoredValue;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.sync.RedisKeyCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.models.stream.PendingMessages;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.runner.RunWith;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes={BatchTestApplication.class})
@RunWith(value=SpringRunner.class)
abstract class BatchTests
extends AbstractTargetTestBase {
    BatchTests() {
    }

    @Override
    protected DataType[] generatorDataTypes() {
        return REDIS_GENERATOR_TYPES;
    }

    @Test
    void compareSet(TestInfo info) throws Exception {
        this.commands.sadd((Object)"set:1", (Object[])new String[]{"value1", "value2"});
        this.targetCommands.sadd((Object)"set:1", (Object[])new String[]{"value2", "value1"});
        KeyComparisonItemReader reader = this.comparisonReader(info);
        reader.setName(BatchTests.name(info));
        reader.open(new ExecutionContext());
        List comparisons = BatchTests.readAll(reader);
        reader.close();
        Assertions.assertEquals((Object)KeyComparison.Status.OK, (Object)((KeyComparison)comparisons.get(0)).getStatus());
    }

    @Test
    void compareQuick(TestInfo info) throws Exception {
        int sourceCount = 100;
        for (int index = 1; index <= sourceCount; ++index) {
            this.commands.set((Object)("key:" + index), (Object)("value:" + index));
        }
        int targetCount = 90;
        for (int index = 1; index <= targetCount; ++index) {
            this.targetCommands.set((Object)("key:" + index), (Object)("value:" + index));
        }
        KeyTypeItemReader source = RedisItemReader.type((AbstractRedisClient)this.client);
        KeyTypeItemReader target = RedisItemReader.type((AbstractRedisClient)this.targetClient);
        KeyComparisonItemReader reader = new KeyComparisonItemReader((KeyValueItemReader)source, (KeyValueItemReader)target);
        reader.setName(BatchTests.name(info));
        reader.open(new ExecutionContext());
        List comparisons = BatchTests.readAll(reader);
        reader.close();
        List missing = comparisons.stream().filter(c -> c.getStatus() == KeyComparison.Status.MISSING).collect(Collectors.toList());
        Assertions.assertEquals((int)(sourceCount - targetCount), (int)missing.size());
    }

    @Test
    void compareStreams(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(10);
        gen.setTypes(new DataType[]{DataType.STREAM});
        this.generate(info, gen);
        this.replicate(info, (RedisItemReader)RedisItemReader.struct((AbstractRedisClient)this.client), (RedisItemWriter)RedisItemWriter.struct((AbstractRedisClient)this.targetClient));
        List<KeyComparison> diffs = this.compare(info);
        this.assertEmpty(diffs);
    }

    @Test
    void compareStatus(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(120);
        this.generate(info, gen);
        this.replicate(info, (RedisItemReader)RedisItemReader.dump((AbstractRedisClient)this.client), (RedisItemWriter)RedisItemWriter.dump((AbstractRedisClient)this.targetClient));
        long deleted = 0L;
        for (int index = 0; index < 13; ++index) {
            deleted += this.targetCommands.del((Object[])new String[]{(String)this.targetCommands.randomkey()}).longValue();
        }
        HashSet<String> ttlChanges = new HashSet<String>();
        for (int index = 0; index < 23; ++index) {
            long ttl;
            String key = (String)this.targetCommands.randomkey();
            if (key == null || !this.targetCommands.expire((Object)key, ttl = this.targetCommands.ttl((Object)key) + 12345L).booleanValue()) continue;
            ttlChanges.add(key);
        }
        HashSet<String> typeChanges = new HashSet<String>();
        HashSet<String> valueChanges = new HashSet<String>();
        for (int index = 0; index < 17; ++index) {
            String key;
            while ((key = (String)this.targetCommands.randomkey()) == null) {
            }
            DataType type = DataType.of((String)this.targetCommands.type((Object)key));
            if (type == DataType.STRING) {
                if (!typeChanges.contains(key)) {
                    valueChanges.add(key);
                }
                ttlChanges.remove(key);
            } else {
                typeChanges.add(key);
                valueChanges.remove(key);
                ttlChanges.remove(key);
            }
            this.targetCommands.set((Object)key, (Object)"blah");
        }
        KeyComparisonItemReader comparator = this.comparisonReader(info);
        comparator.setName(BatchTests.name(info));
        comparator.open(new ExecutionContext());
        List comparisons = BatchTests.readAll(comparator);
        comparator.close();
        long sourceCount = this.commands.dbsize();
        Assertions.assertEquals((long)sourceCount, (long)comparisons.size());
        Assertions.assertEquals((long)sourceCount, (long)(this.targetCommands.dbsize() + deleted));
        List actualTypeChanges = comparisons.stream().filter(c -> c.getStatus() == KeyComparison.Status.TYPE).collect(Collectors.toList());
        Assertions.assertEquals((int)typeChanges.size(), (int)actualTypeChanges.size());
        Assertions.assertEquals((long)valueChanges.size(), (long)comparisons.stream().filter(c -> c.getStatus() == KeyComparison.Status.VALUE).count());
        Assertions.assertEquals((long)ttlChanges.size(), (long)comparisons.stream().filter(c -> c.getStatus() == KeyComparison.Status.TTL).count());
        Assertions.assertEquals((long)deleted, (long)comparisons.stream().filter(c -> c.getStatus() == KeyComparison.Status.MISSING).count());
    }

    @Test
    void estimateScanSize(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(10000, DataType.HASH, DataType.STRING);
        this.generate(info, gen);
        long expectedCount = this.commands.dbsize();
        ScanSizeEstimator estimator = new ScanSizeEstimator(this.client);
        estimator.setScanMatch("gen:*");
        estimator.setSamples(1000);
        Assertions.assertEquals((float)expectedCount, (float)estimator.getAsLong(), (float)(expectedCount / 10L));
        estimator.setScanType(DataType.HASH.getString());
        Assertions.assertEquals((float)(expectedCount / 2L), (float)estimator.getAsLong(), (float)(expectedCount / 10L));
    }

    @Test
    void readStruct(TestInfo info) throws Exception {
        this.generate(info);
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        reader.open(new ExecutionContext());
        List list = BatchTests.readAll(reader);
        reader.close();
        Assertions.assertEquals((Long)this.commands.dbsize(), (long)list.size());
    }

    @Test
    void readStructMultiThreaded(TestInfo info) throws Exception {
        this.generate(info);
        int threads = 4;
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        SynchronizedListItemWriter writer = new SynchronizedListItemWriter();
        SimpleStepBuilder step = this.step(info, reader, writer);
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(threads);
        taskExecutor.setCorePoolSize(threads);
        taskExecutor.setQueueCapacity(threads);
        taskExecutor.afterPropertiesSet();
        step.taskExecutor((TaskExecutor)taskExecutor);
        step.throttleLimit(threads);
        this.run(info, step);
        this.awaitUntilFalse(() -> ((StructItemReader)reader).isOpen());
        this.awaitUntilFalse(writer::isOpen);
        Assertions.assertEquals((Long)this.commands.dbsize(), (long)writer.getItems().stream().map(KeyValue::getKey).collect(Collectors.toSet()).size());
    }

    @Test
    void readStreamAutoAck() throws InterruptedException {
        String stream = "stream1";
        String consumerGroup = "batchtests-readStreamAutoAck";
        Consumer consumer = Consumer.from((Object)consumerGroup, (Object)"consumer1");
        StreamItemReader<String, String> reader = this.streamReader(stream, (Consumer<String>)consumer);
        reader.setAckPolicy(StreamItemReader.StreamAckPolicy.AUTO);
        this.open((ItemStream)reader);
        String field1 = "field1";
        String value1 = "value1";
        String field2 = "field2";
        String value2 = "value2";
        Map<String, String> body = this.map(field1, value1, field2, value2);
        String id1 = this.commands.xadd((Object)stream, body);
        String id2 = this.commands.xadd((Object)stream, body);
        String id3 = this.commands.xadd((Object)stream, body);
        ArrayList messages = new ArrayList();
        this.awaitUntil(() -> messages.addAll(reader.readMessages()));
        Assertions.assertEquals((int)3, (int)messages.size());
        this.assertStreamEquals(id1, body, stream, (StreamMessage<String, String>)((StreamMessage)messages.get(0)));
        this.assertStreamEquals(id2, body, stream, (StreamMessage<String, String>)((StreamMessage)messages.get(1)));
        this.assertStreamEquals(id3, body, stream, (StreamMessage<String, String>)((StreamMessage)messages.get(2)));
        reader.close();
        Assertions.assertEquals((long)0L, (long)this.commands.xpending((Object)stream, (Object)consumerGroup).getCount(), (String)"pending messages");
    }

    @Test
    void readStreamManualAck() throws Exception {
        String stream = "stream1";
        String consumerGroup = "batchtests-readStreamManualAck";
        Consumer consumer = Consumer.from((Object)consumerGroup, (Object)"consumer1");
        StreamItemReader<String, String> reader = this.streamReader(stream, (Consumer<String>)consumer);
        reader.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
        this.open((ItemStream)reader);
        String field1 = "field1";
        String value1 = "value1";
        String field2 = "field2";
        String value2 = "value2";
        Map<String, String> body = this.map(field1, value1, field2, value2);
        String id1 = this.commands.xadd((Object)stream, body);
        String id2 = this.commands.xadd((Object)stream, body);
        String id3 = this.commands.xadd((Object)stream, body);
        ArrayList messages = new ArrayList();
        this.awaitUntil(() -> messages.addAll(reader.readMessages()));
        Assertions.assertEquals((int)3, (int)messages.size());
        this.assertStreamEquals(id1, body, stream, (StreamMessage<String, String>)((StreamMessage)messages.get(0)));
        this.assertStreamEquals(id2, body, stream, (StreamMessage<String, String>)((StreamMessage)messages.get(1)));
        this.assertStreamEquals(id3, body, stream, (StreamMessage<String, String>)((StreamMessage)messages.get(2)));
        PendingMessages pendingMsgsBeforeCommit = this.commands.xpending((Object)stream, (Object)consumerGroup);
        Assertions.assertEquals((long)3L, (long)pendingMsgsBeforeCommit.getCount(), (String)"pending messages before commit");
        this.commands.xack((Object)stream, (Object)consumerGroup, new String[]{((StreamMessage)messages.get(0)).getId(), ((StreamMessage)messages.get(1)).getId()});
        PendingMessages pendingMsgsAfterCommit = this.commands.xpending((Object)stream, (Object)consumerGroup);
        Assertions.assertEquals((long)1L, (long)pendingMsgsAfterCommit.getCount(), (String)"pending messages after commit");
        reader.close();
    }

    @Test
    void readStreamManualAckRecover() throws InterruptedException {
        String stream = "stream1";
        Consumer consumer = Consumer.from((Object)"batchtests-readStreamManualAckRecover", (Object)"consumer1");
        StreamItemReader<String, String> reader = this.streamReader(stream, (Consumer<String>)consumer);
        reader.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
        this.open((ItemStream)reader);
        String field1 = "field1";
        String value1 = "value1";
        String field2 = "field2";
        String value2 = "value2";
        Map<String, String> body = this.map(field1, value1, field2, value2);
        this.commands.xadd((Object)stream, body);
        this.commands.xadd((Object)stream, body);
        this.commands.xadd((Object)stream, body);
        ArrayList messages = new ArrayList();
        this.awaitUntil(() -> messages.addAll(reader.readMessages()));
        Assertions.assertEquals((int)3, (int)messages.size());
        ArrayList recoveredMessages = new ArrayList();
        this.commands.xadd((Object)stream, body);
        this.commands.xadd((Object)stream, body);
        this.commands.xadd((Object)stream, body);
        reader.close();
        StreamItemReader<String, String> reader2 = this.streamReader(stream, (Consumer<String>)consumer);
        reader2.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
        this.open((ItemStream)reader2);
        this.awaitUntil(() -> recoveredMessages.addAll(reader2.readMessages()));
        this.awaitUntil(() -> !recoveredMessages.addAll(reader2.readMessages()));
        Assertions.assertEquals((int)6, (int)recoveredMessages.size());
    }

    @Test
    void readStreamManualAckRecoverUncommitted() throws InterruptedException {
        String stream = "stream1";
        String consumerGroup = "batchtests-readStreamManualAckRecoverUncommitted";
        Consumer consumer = Consumer.from((Object)consumerGroup, (Object)"consumer1");
        StreamItemReader<String, String> reader = this.streamReader(stream, (Consumer<String>)consumer);
        reader.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
        this.open((ItemStream)reader);
        String field1 = "field1";
        String value1 = "value1";
        String field2 = "field2";
        String value2 = "value2";
        Map<String, String> body = this.map(field1, value1, field2, value2);
        this.commands.xadd((Object)stream, body);
        this.commands.xadd((Object)stream, body);
        String id3 = this.commands.xadd((Object)stream, body);
        ArrayList messages = new ArrayList();
        this.awaitUntil(() -> messages.addAll(reader.readMessages()));
        Assertions.assertEquals((int)3, (int)messages.size());
        this.commands.xack((Object)stream, (Object)consumerGroup, new String[]{((StreamMessage)messages.get(0)).getId(), ((StreamMessage)messages.get(1)).getId()});
        ArrayList recoveredMessages = new ArrayList();
        String id4 = this.commands.xadd((Object)stream, body);
        String id5 = this.commands.xadd((Object)stream, body);
        String id6 = this.commands.xadd((Object)stream, body);
        reader.close();
        StreamItemReader<String, String> reader2 = this.streamReader(stream, (Consumer<String>)consumer);
        reader2.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
        reader2.setOffset(((StreamMessage)messages.get(1)).getId());
        this.open((ItemStream)reader2);
        this.awaitUntil(() -> recoveredMessages.addAll(reader2.readMessages()));
        this.awaitUntil(() -> !recoveredMessages.addAll(reader2.readMessages()));
        List recoveredIds = recoveredMessages.stream().map(StreamMessage::getId).collect(Collectors.toList());
        Assertions.assertEquals(Arrays.asList(id3, id4, id5, id6), recoveredIds, (String)"recoveredIds");
        reader2.close();
    }

    @Test
    void readStreamManualAckRecoverFromOffset() throws Exception {
        String stream = "stream1";
        String consumerGroup = "batchtests-readStreamManualAckRecoverFromOffset";
        Consumer consumer = Consumer.from((Object)consumerGroup, (Object)"consumer1");
        StreamItemReader<String, String> reader = this.streamReader(stream, (Consumer<String>)consumer);
        reader.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
        this.open((ItemStream)reader);
        String field1 = "field1";
        String value1 = "value1";
        String field2 = "field2";
        String value2 = "value2";
        Map<String, String> body = this.map(field1, value1, field2, value2);
        this.commands.xadd((Object)stream, body);
        this.commands.xadd((Object)stream, body);
        String id3 = this.commands.xadd((Object)stream, body);
        ArrayList sourceRecords = new ArrayList();
        this.awaitUntil(() -> sourceRecords.addAll(reader.readMessages()));
        Assertions.assertEquals((int)3, (int)sourceRecords.size());
        ArrayList recoveredRecords = new ArrayList();
        String id4 = this.commands.xadd((Object)stream, body);
        String id5 = this.commands.xadd((Object)stream, body);
        String id6 = this.commands.xadd((Object)stream, body);
        reader.close();
        StreamItemReader<String, String> reader2 = this.streamReader(stream, (Consumer<String>)consumer);
        reader2.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
        reader2.setOffset(id3);
        this.open((ItemStream)reader2);
        this.awaitUntil(() -> recoveredRecords.addAll(reader2.readMessages()));
        this.awaitUntil(() -> !recoveredRecords.addAll(reader2.readMessages()));
        List recoveredIds = recoveredRecords.stream().map(StreamMessage::getId).collect(Collectors.toList());
        Assertions.assertEquals(Arrays.asList(id4, id5, id6), recoveredIds, (String)"recoveredIds");
        reader2.close();
    }

    @Test
    void readStreamRecoverManualAckToAutoAck() throws InterruptedException {
        String stream = "stream1";
        String consumerGroup = "readStreamRecoverManualAckToAutoAck";
        Consumer consumer = Consumer.from((Object)consumerGroup, (Object)"consumer1");
        StreamItemReader<String, String> reader = this.streamReader(stream, (Consumer<String>)consumer);
        reader.setAckPolicy(StreamItemReader.StreamAckPolicy.MANUAL);
        this.open((ItemStream)reader);
        String field1 = "field1";
        String value1 = "value1";
        String field2 = "field2";
        String value2 = "value2";
        Map<String, String> body = this.map(field1, value1, field2, value2);
        this.commands.xadd((Object)stream, body);
        this.commands.xadd((Object)stream, body);
        this.commands.xadd((Object)stream, body);
        ArrayList sourceRecords = new ArrayList();
        this.awaitUntil(() -> sourceRecords.addAll(reader.readMessages()));
        Assertions.assertEquals((int)3, (int)sourceRecords.size());
        ArrayList recoveredRecords = new ArrayList();
        String id4 = this.commands.xadd((Object)stream, body);
        String id5 = this.commands.xadd((Object)stream, body);
        String id6 = this.commands.xadd((Object)stream, body);
        reader.close();
        StreamItemReader<String, String> reader2 = this.streamReader(stream, (Consumer<String>)consumer);
        reader2.setAckPolicy(StreamItemReader.StreamAckPolicy.AUTO);
        this.open((ItemStream)reader2);
        this.awaitUntil(() -> recoveredRecords.addAll(reader2.readMessages()));
        this.awaitUntil(() -> !recoveredRecords.addAll(reader2.readMessages()));
        List recoveredIds = recoveredRecords.stream().map(StreamMessage::getId).collect(Collectors.toList());
        Assertions.assertEquals(Arrays.asList(id4, id5, id6), recoveredIds, (String)"recoveredIds");
        PendingMessages pending = this.commands.xpending((Object)stream, (Object)consumerGroup);
        Assertions.assertEquals((long)0L, (long)pending.getCount(), (String)"pending message count");
        reader2.close();
    }

    @Test
    void readStreamAck(TestInfo info) throws Exception {
        this.generateStreams(info, 57);
        List keys = ScanIterator.scan((RedisKeyCommands)this.commands, (ScanArgs)KeyScanArgs.Builder.type((String)DataType.STREAM.getString())).stream().collect(Collectors.toList());
        Consumer consumer = Consumer.from((Object)"batchtests-readmessages", (Object)"consumer1");
        for (String key : keys) {
            long count = this.commands.xlen((Object)key);
            StreamItemReader<String, String> reader = this.streamReader(key, (Consumer<String>)consumer);
            reader.setName(BatchTests.name(info) + "-reader");
            reader.open(new ExecutionContext());
            List messages = BatchTests.readAll(reader);
            Assertions.assertEquals((long)count, (long)messages.size());
            this.assertMessageBody(messages);
            this.awaitUntil(() -> reader.ack((Iterable)reader.readMessages()) == 0L);
            reader.close();
        }
    }

    @Test
    void readStream(TestInfo info) throws Exception {
        this.generateStreams(info, 277);
        List keys = ScanIterator.scan((RedisKeyCommands)this.commands, (ScanArgs)KeyScanArgs.Builder.type((String)DataType.STREAM.getString())).stream().collect(Collectors.toList());
        Consumer consumer = Consumer.from((Object)"batchtests-readstreamjob", (Object)"consumer1");
        for (String key : keys) {
            long count = this.commands.xlen((Object)key);
            StreamItemReader<String, String> reader = this.streamReader(key, (Consumer<String>)consumer);
            reader.setName(BatchTests.name(info) + "-reader");
            reader.open(new ExecutionContext());
            List messages = BatchTests.readAll(reader);
            reader.close();
            Assertions.assertEquals((long)count, (long)messages.size());
            this.assertMessageBody(messages);
        }
    }

    @Test
    void readStructHash() throws Exception {
        String key = "myhash";
        HashMap<String, String> hash = new HashMap<String, String>();
        hash.put("field1", "value1");
        hash.put("field2", "value2");
        this.commands.hset((Object)key, hash);
        long ttl = System.currentTimeMillis() + 123456L;
        this.commands.pexpireat((Object)key, ttl);
        OperationValueReader<String, String, String, KeyValue<String>> reader = this.structOperationExecutor();
        KeyValue ds = (KeyValue)reader.process(Arrays.asList(key)).get(0);
        Assertions.assertEquals((Object)key, (Object)ds.getKey());
        Assertions.assertEquals((long)ttl, (long)ds.getTtl());
        Assertions.assertEquals((Object)DataType.HASH, (Object)ds.getType());
        Assertions.assertEquals(hash, (Object)ds.getValue());
        reader.close();
    }

    @Test
    void readStructZset() throws Exception {
        String key = "myzset";
        ScoredValue[] values = new ScoredValue[]{ScoredValue.just((double)123.456, (Object)"value1"), ScoredValue.just((double)654.321, (Object)"value2")};
        this.commands.zadd((Object)key, values);
        OperationValueReader<String, String, String, KeyValue<String>> executor = this.structOperationExecutor();
        KeyValue ds = (KeyValue)executor.process(Arrays.asList(key)).get(0);
        Assertions.assertEquals((Object)key, (Object)ds.getKey());
        Assertions.assertEquals((Object)DataType.ZSET, (Object)ds.getType());
        Assertions.assertEquals(new HashSet<ScoredValue>(Arrays.asList(values)), (Object)ds.getValue());
        executor.close();
    }

    @Test
    void readStructList() throws Exception {
        String key = "mylist";
        List<String> values = Arrays.asList("value1", "value2");
        this.commands.rpush((Object)key, (Object[])values.toArray(new String[0]));
        OperationValueReader<String, String, String, KeyValue<String>> executor = this.structOperationExecutor();
        KeyValue ds = (KeyValue)executor.process(Arrays.asList(key)).get(0);
        Assertions.assertEquals((Object)key, (Object)ds.getKey());
        Assertions.assertEquals((Object)DataType.LIST, (Object)ds.getType());
        Assertions.assertEquals(values, (Object)ds.getValue());
        executor.close();
    }

    @Test
    void readStructStream() throws Exception {
        String key = "mystream";
        HashMap<String, String> body = new HashMap<String, String>();
        body.put("field1", "value1");
        body.put("field2", "value2");
        this.commands.xadd((Object)key, body);
        this.commands.xadd((Object)key, body);
        OperationValueReader<String, String, String, KeyValue<String>> executor = this.structOperationExecutor();
        KeyValue ds = (KeyValue)executor.process(Arrays.asList(key)).get(0);
        Assertions.assertEquals((Object)key, (Object)ds.getKey());
        Assertions.assertEquals((Object)DataType.STREAM, (Object)ds.getType());
        List messages = (List)ds.getValue();
        Assertions.assertEquals((int)2, (int)messages.size());
        for (StreamMessage message : messages) {
            Assertions.assertEquals(body, (Object)message.getBody());
            Assertions.assertNotNull((Object)message.getId());
        }
        executor.close();
    }

    @Test
    void readDumpStream() throws Exception {
        String key = "mystream";
        HashMap<String, String> body = new HashMap<String, String>();
        body.put("field1", "value1");
        body.put("field2", "value2");
        this.commands.xadd((Object)key, body);
        this.commands.xadd((Object)key, body);
        long ttl = System.currentTimeMillis() + 123456L;
        this.commands.pexpireat((Object)key, ttl);
        OperationValueReader<byte[], byte[], byte[], KeyValue<byte[]>> executor = this.dumpOperationExecutor();
        KeyValue dump = (KeyValue)executor.process(Arrays.asList(new byte[][]{this.toByteArray(key)})).get(0);
        Assertions.assertArrayEquals((byte[])this.toByteArray(key), (byte[])((byte[])dump.getKey()));
        Assertions.assertTrue((Math.abs(ttl - dump.getTtl()) <= 3L ? 1 : 0) != 0);
        this.commands.del((Object[])new String[]{key});
        this.commands.restore((Object)key, (byte[])dump.getValue(), RestoreArgs.Builder.ttl((long)ttl).absttl());
        Assertions.assertEquals((Object)DataType.STREAM.getString(), (Object)this.commands.type((Object)key));
        executor.close();
    }

    @Test
    void readStructStreamByteArray() throws Exception {
        String key = "mystream";
        HashMap<String, String> body = new HashMap<String, String>();
        body.put("field1", "value1");
        body.put("field2", "value2");
        this.commands.xadd((Object)key, body);
        this.commands.xadd((Object)key, body);
        OperationValueReader executor = this.structOperationExecutor(ByteArrayCodec.INSTANCE);
        KeyValue ds = (KeyValue)executor.process(Arrays.asList(new byte[][]{this.toByteArray(key)})).get(0);
        Assertions.assertArrayEquals((byte[])this.toByteArray(key), (byte[])((byte[])ds.getKey()));
        Assertions.assertEquals((Object)DataType.STREAM, (Object)ds.getType());
        List messages = (List)ds.getValue();
        Assertions.assertEquals((int)2, (int)messages.size());
        for (StreamMessage message : messages) {
            Map actual = message.getBody();
            Assertions.assertEquals((int)2, (int)actual.size());
            HashMap actualString = new HashMap();
            actual.forEach((k, v) -> actualString.put(this.toString((byte[])k), this.toString((byte[])v)));
            Assertions.assertEquals(body, actualString);
        }
        executor.close();
    }

    @Test
    void readStructHLL() throws Exception {
        String key1 = "hll:1";
        this.commands.pfadd((Object)key1, (Object[])new String[]{"member:1", "member:2"});
        String key2 = "hll:2";
        this.commands.pfadd((Object)key2, (Object[])new String[]{"member:1", "member:2", "member:3"});
        OperationValueReader<String, String, String, KeyValue<String>> executor = this.structOperationExecutor();
        KeyValue ds1 = (KeyValue)executor.process(Arrays.asList(key1)).get(0);
        Assertions.assertEquals((Object)key1, (Object)ds1.getKey());
        Assertions.assertEquals((Object)DataType.STRING, (Object)ds1.getType());
        Assertions.assertEquals((Object)this.commands.get((Object)key1), (Object)ds1.getValue());
        executor.close();
    }

    @Test
    void replicateDump(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(100);
        this.generate(info, gen);
        List<KeyComparison> diffs = this.replicate(info, (RedisItemReader)RedisItemReader.dump((AbstractRedisClient)this.client), (RedisItemWriter)RedisItemWriter.dump((AbstractRedisClient)this.targetClient));
        Assertions.assertTrue((boolean)diffs.isEmpty());
    }

    @Test
    void replicateStruct(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(100);
        this.generate(info, gen);
        List<KeyComparison> diffs = this.replicate(info, (RedisItemReader)RedisItemReader.struct((AbstractRedisClient)this.client), (RedisItemWriter)RedisItemWriter.struct((AbstractRedisClient)this.targetClient));
        Assertions.assertTrue((boolean)diffs.isEmpty());
    }

    @Test
    void replicateStructByteArray(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(1000);
        this.generate(info, gen);
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client, (RedisCodec)ByteArrayCodec.INSTANCE);
        StructItemWriter writer = RedisItemWriter.struct((AbstractRedisClient)this.targetClient, (RedisCodec)ByteArrayCodec.INSTANCE);
        List<KeyComparison> diffs = this.replicate(info, (RedisItemReader)reader, (RedisItemWriter)writer);
        Assertions.assertTrue((boolean)diffs.isEmpty());
    }

    @Test
    void replicateStructBinaryStrings(TestInfo info) throws Exception {
        try (StatefulRedisModulesConnection connection = RedisModulesUtils.connection((AbstractRedisClient)this.client, (RedisCodec)ByteArrayCodec.INSTANCE);){
            connection.setAutoFlushCommands(false);
            RedisAsyncCommands async = connection.async();
            ArrayList<RedisFuture> futures = new ArrayList<RedisFuture>();
            Random random = new Random();
            for (int index = 0; index < 100; ++index) {
                String key = "binary:" + index;
                byte[] value = new byte[1000];
                random.nextBytes(value);
                futures.add(async.set((Object)key.getBytes(), (Object)value));
            }
            connection.flushCommands();
            LettuceFutures.awaitAll((Duration)connection.getTimeout(), (Future[])((Future[])futures.toArray(new RedisFuture[0])));
            connection.setAutoFlushCommands(true);
        }
        this.assertEmpty(this.replicate(info, (RedisItemReader)RedisItemReader.struct((AbstractRedisClient)this.client, (RedisCodec)ByteArrayCodec.INSTANCE), (RedisItemWriter)RedisItemWriter.struct((AbstractRedisClient)this.targetClient, (RedisCodec)ByteArrayCodec.INSTANCE)));
    }

    protected <K, V> List<KeyComparison> replicate(TestInfo info, RedisItemReader<K, V, KeyValue<K>> reader, RedisItemWriter<K, V, KeyValue<K>> writer) throws Exception {
        this.configureReader(info, reader);
        this.run(info, reader, writer);
        this.awaitUntilFalse(() -> reader.isOpen());
        this.awaitUntilFalse(() -> writer.isOpen());
        return this.compare(info);
    }

    @Test
    void replicateStructEmptyCollections(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(1000);
        com.redis.spring.batch.common.Range cardinality = com.redis.spring.batch.common.Range.of((int)0);
        gen.getHashOptions().setFieldCount(cardinality);
        gen.getSetOptions().setMemberCount(cardinality);
        gen.getStreamOptions().setMessageCount(cardinality);
        gen.getTimeSeriesOptions().setSampleCount(cardinality);
        gen.getZsetOptions().setMemberCount(cardinality);
        this.generate(info, gen);
        this.assertEmpty(this.replicate(info, (RedisItemReader)RedisItemReader.struct((AbstractRedisClient)this.client), (RedisItemWriter)RedisItemWriter.struct((AbstractRedisClient)this.targetClient)));
    }

    @Test
    void replicateHLL(TestInfo info) throws Exception {
        String key1 = "hll:1";
        this.commands.pfadd((Object)key1, (Object[])new String[]{"member:1", "member:2"});
        String key2 = "hll:2";
        this.commands.pfadd((Object)key2, (Object[])new String[]{"member:1", "member:2", "member:3"});
        List<KeyComparison> diffs = this.replicate(info, (RedisItemReader)RedisItemReader.struct((AbstractRedisClient)this.client, (RedisCodec)ByteArrayCodec.INSTANCE), (RedisItemWriter)RedisItemWriter.struct((AbstractRedisClient)this.targetClient, (RedisCodec)ByteArrayCodec.INSTANCE));
        Assertions.assertTrue((boolean)diffs.isEmpty());
        Assertions.assertEquals((Long)this.commands.pfcount((Object[])new String[]{key1}), (Long)this.targetCommands.pfcount((Object[])new String[]{key1}));
    }

    @Test
    void writeGeo(TestInfo info) throws Exception {
        ListItemReader reader = new ListItemReader(Arrays.asList(new Geo("Venice Breakwater", -118.476056, 33.985728), new Geo("Long Beach National", -73.667022, 40.582739)));
        Geoadd geoadd = new Geoadd();
        geoadd.setKey((Object)"geoset");
        geoadd.setValueFunction((Function)new ToGeoValueFunction(Geo::getMember, Geo::getLongitude, Geo::getLatitude));
        OperationItemWriter writer = this.writer(geoadd);
        this.run(info, reader, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Set radius1 = this.commands.georadius((Object)"geoset", -118.0, 34.0, 100.0, GeoArgs.Unit.mi);
        Assertions.assertEquals((int)1, (int)radius1.size());
        Assertions.assertTrue((boolean)radius1.contains("Venice Breakwater"));
    }

    @Test
    void writeHashDel(TestInfo info) throws Exception {
        ArrayList hashes = new ArrayList();
        for (int index = 0; index < 100; ++index) {
            String key = String.valueOf(index);
            HashMap<String, String> value = new HashMap<String, String>();
            value.put("field1", "value1");
            this.commands.hset((Object)("hash:" + key), value);
            HashMap<String, String> body = new HashMap<String, String>();
            body.put("field2", "value2");
            hashes.add(new AbstractMap.SimpleEntry(key, index < 50 ? null : body));
        }
        ListItemReader reader = new ListItemReader(hashes);
        Hset hset = new Hset();
        hset.setKeyFunction(e -> "hash:" + (String)e.getKey());
        hset.setMapFunction(Map.Entry::getValue);
        OperationItemWriter writer = this.writer(hset);
        this.run(info, reader, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Assertions.assertEquals((int)100, (int)this.commands.keys((Object)"hash:*").size());
        Assertions.assertEquals((int)2, (int)this.commands.hgetall((Object)"hash:50").size());
    }

    @Test
    void writeZset(TestInfo info) throws Exception {
        String key = "zadd";
        ArrayList<ZValue> values = new ArrayList<ZValue>();
        for (int index = 0; index < 100; ++index) {
            values.add(new ZValue(String.valueOf(index), index % 10));
        }
        ListItemReader reader = new ListItemReader(values);
        Zadd zadd = new Zadd();
        zadd.setKey((Object)key);
        zadd.setValueFunction((Function)new ToScoredValueFunction(ZValue::getMember, ZValue::getScore));
        OperationItemWriter writer = this.writer(zadd);
        this.run(info, reader, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Assertions.assertEquals((long)1L, (Long)this.commands.dbsize());
        Assertions.assertEquals((long)values.size(), (Long)this.commands.zcard((Object)key));
        Assertions.assertEquals((int)60, (int)this.commands.zrangebyscore((Object)key, Range.from((Range.Boundary)Range.Boundary.including((Object)0), (Range.Boundary)Range.Boundary.including((Object)5))).size());
    }

    @Test
    void writeSet(TestInfo info) throws Exception {
        String key = "sadd";
        ArrayList<String> values = new ArrayList<String>();
        for (int index = 0; index < 100; ++index) {
            values.add(String.valueOf(index));
        }
        ListItemReader reader = new ListItemReader(values);
        Sadd sadd = new Sadd();
        sadd.setKey((Object)key);
        sadd.setValueFunction(Function.identity());
        OperationItemWriter writer = this.writer(sadd);
        this.run(info, reader, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Assertions.assertEquals((long)1L, (Long)this.commands.dbsize());
        Assertions.assertEquals((long)values.size(), (Long)this.commands.scard((Object)key));
    }

    @Test
    void writeStructOverwrite(TestInfo info) throws Exception {
        GeneratorItemReader gen1 = this.generator(100, DataType.HASH);
        gen1.setHashOptions(this.hashOptions(com.redis.spring.batch.common.Range.of((int)5)));
        this.generate(info, this.client, gen1);
        GeneratorItemReader gen2 = this.generator(100, DataType.HASH);
        gen2.setHashOptions(this.hashOptions(com.redis.spring.batch.common.Range.of((int)10)));
        this.generate(info, this.targetClient, gen2);
        this.assertEmpty(this.replicate(info, (RedisItemReader)RedisItemReader.struct((AbstractRedisClient)this.client), (RedisItemWriter)RedisItemWriter.struct((AbstractRedisClient)this.targetClient)));
        Assertions.assertEquals((Object)this.commands.hgetall((Object)"gen:1"), (Object)this.targetCommands.hgetall((Object)"gen:1"));
    }

    protected void assertEmpty(List<?> list) {
        Assertions.assertTrue((boolean)list.isEmpty());
    }

    private MapOptions hashOptions(com.redis.spring.batch.common.Range fieldCount) {
        MapOptions options = new MapOptions();
        options.setFieldCount(fieldCount);
        return options;
    }

    @Test
    void writeStructMerge(TestInfo info) throws Exception {
        GeneratorItemReader gen1 = this.generator(100, DataType.HASH);
        gen1.setHashOptions(this.hashOptions(com.redis.spring.batch.common.Range.of((int)5)));
        this.generate(info, this.client, gen1);
        GeneratorItemReader gen2 = this.generator(100, DataType.HASH);
        gen2.setHashOptions(this.hashOptions(com.redis.spring.batch.common.Range.of((int)10)));
        this.generate(info, this.targetClient, gen2);
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        StructItemWriter writer = RedisItemWriter.struct((AbstractRedisClient)this.targetClient);
        writer.setMerge(true);
        this.replicate(info, (RedisItemReader)reader, (RedisItemWriter)writer);
        Map actual = this.targetCommands.hgetall((Object)"gen:1");
        Assertions.assertEquals((int)10, (int)actual.size());
    }

    @Test
    void writeWait(TestInfo info) throws Exception {
        ArrayList maps = new ArrayList();
        for (int index = 0; index < 100; ++index) {
            HashMap<String, String> body = new HashMap<String, String>();
            body.put("id", String.valueOf(index));
            body.put("field1", "value1");
            body.put("field2", "value2");
            maps.add(body);
        }
        ListItemReader reader = new ListItemReader(maps);
        Hset hset = new Hset();
        hset.setKeyFunction(m -> "hash:" + (String)m.remove("id"));
        hset.setMapFunction(Function.identity());
        OperationItemWriter writer = this.writer(hset);
        writer.setWaitReplicas(1);
        writer.setWaitTimeout(Duration.ofMillis(300L));
        JobExecution execution = this.run(info, reader, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        List exceptions = execution.getAllFailureExceptions();
        Assertions.assertEquals((Object)"Insufficient replication level (0/1)", (Object)((Throwable)exceptions.get(0)).getCause().getCause().getMessage());
    }

    @Test
    void writeHash(TestInfo info) throws Exception {
        ArrayList maps = new ArrayList();
        for (int index = 0; index < 100; ++index) {
            HashMap<String, String> body = new HashMap<String, String>();
            body.put("id", String.valueOf(index));
            body.put("field1", "value1");
            body.put("field2", "value2");
            maps.add(body);
        }
        ListItemReader reader = new ListItemReader(maps);
        Hset hset = new Hset();
        hset.setKeyFunction(m -> "hash:" + (String)m.remove("id"));
        hset.setMapFunction(Function.identity());
        OperationItemWriter writer = this.writer(hset);
        this.run(info, reader, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Assertions.assertEquals((int)maps.size(), (int)this.commands.keys((Object)"hash:*").size());
        for (int index = 0; index < maps.size(); ++index) {
            Map hash = this.commands.hgetall((Object)("hash:" + index));
            Assertions.assertEquals(maps.get(index), (Object)hash);
        }
    }

    @Test
    void writeDel(TestInfo info) throws Exception {
        this.generate(info);
        GeneratorItemReader gen = this.generator(100);
        Del del = new Del();
        del.setKeyFunction(KeyValue::getKey);
        OperationItemWriter writer = this.writer(del);
        this.run(info, gen, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Assertions.assertEquals((int)0, (int)this.commands.keys((Object)"gen*").size());
    }

    @Test
    void writeLpush(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(DataType.STRING);
        Lpush lpush = new Lpush();
        lpush.setKeyFunction(KeyValue::getKey);
        lpush.setValueFunction(v -> (String)v.getValue());
        OperationItemWriter writer = this.writer(lpush);
        this.run(info, gen, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Assertions.assertEquals((long)100L, (Long)this.commands.dbsize());
        for (String key : this.commands.keys((Object)"*")) {
            Assertions.assertEquals((Object)DataType.LIST.getString(), (Object)this.commands.type((Object)key));
        }
    }

    @Test
    void writeRpush(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(DataType.STRING);
        Rpush rpush = new Rpush();
        rpush.setKeyFunction(KeyValue::getKey);
        rpush.setValueFunction(v -> (String)v.getValue());
        OperationItemWriter writer = this.writer(rpush);
        this.run(info, gen, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Assertions.assertEquals((long)100L, (Long)this.commands.dbsize());
        for (String key : this.commands.keys((Object)"*")) {
            Assertions.assertEquals((Object)DataType.LIST.getString(), (Object)this.commands.type((Object)key));
        }
    }

    @Test
    void writeLpushAll(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(DataType.LIST);
        LpushAll lpushAll = new LpushAll();
        lpushAll.setKeyFunction(KeyValue::getKey);
        lpushAll.setValuesFunction(v -> (Collection)v.getValue());
        OperationItemWriter writer = this.writer(lpushAll);
        this.run(info, gen, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Assertions.assertEquals((long)100L, (Long)this.commands.dbsize());
        for (String key : this.commands.keys((Object)"*")) {
            Assertions.assertEquals((Object)DataType.LIST.getString(), (Object)this.commands.type((Object)key));
        }
    }

    @Test
    void writeExpire(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(DataType.STRING);
        Duration ttl = Duration.ofMillis(1L);
        Expire expire = new Expire();
        expire.setKeyFunction(KeyValue::getKey);
        expire.setTtl(ttl);
        OperationItemWriter writer = this.writer(expire);
        this.run(info, gen, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        this.awaitUntil(() -> this.commands.keys((Object)"*").isEmpty());
        Assertions.assertEquals((long)0L, (Long)this.commands.dbsize());
    }

    @Test
    void writeExpireAt(TestInfo info) throws Exception {
        GeneratorItemReader gen = this.generator(DataType.STRING);
        ExpireAt expireAt = new ExpireAt();
        expireAt.setKeyFunction(KeyValue::getKey);
        expireAt.setEpochFunction(v -> System.currentTimeMillis());
        OperationItemWriter writer = this.writer(expireAt);
        this.run(info, gen, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        this.awaitUntil(() -> this.commands.keys((Object)"*").isEmpty());
        Assertions.assertEquals((long)0L, (Long)this.commands.dbsize());
    }

    @Test
    void writeStream(TestInfo info) throws Exception {
        String stream = "stream:0";
        ArrayList messages = new ArrayList();
        for (int index = 0; index < 100; ++index) {
            HashMap<String, String> body = new HashMap<String, String>();
            body.put("field1", "value1");
            body.put("field2", "value2");
            messages.add(body);
        }
        ListItemReader reader = new ListItemReader(messages);
        Xadd xadd = new Xadd();
        xadd.setKey((Object)stream);
        xadd.setBodyFunction(Function.identity());
        OperationItemWriter writer = this.writer(xadd);
        this.run(info, reader, writer);
        this.awaitUntilFalse(() -> writer.isOpen());
        Assertions.assertEquals((long)messages.size(), (Long)this.commands.xlen((Object)stream));
        List xrange = this.commands.xrange((Object)stream, Range.create((Object)"-", (Object)"+"));
        for (int index = 0; index < xrange.size(); ++index) {
            StreamMessage message = (StreamMessage)xrange.get(index);
            Assertions.assertEquals(messages.get(index), (Object)message.getBody());
        }
    }

    @Test
    void writeStruct(TestInfo info) throws Exception {
        int count = 1000;
        GeneratorItemReader reader = this.generator(count);
        this.generate(info, this.client, reader);
        StructItemWriter writer = RedisItemWriter.struct((AbstractRedisClient)this.client);
        this.run(info, reader, writer);
        List keys = this.commands.keys((Object)"gen:*");
        Assertions.assertEquals((int)count, (int)keys.size());
    }
}

