/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.test;

import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.reader.DumpItemReader;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.test.BatchTests;
import com.redis.spring.batch.util.CodecUtils;
import com.redis.spring.batch.writer.DumpItemWriter;
import com.redis.spring.batch.writer.StructItemWriter;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.item.ExecutionContext;

abstract class LiveTests
extends BatchTests {
    LiveTests() {
    }

    @Test
    void readKeyspaceNotificationsDedupe() throws Exception {
        this.enableKeyspaceNotifications(this.client);
        KeyspaceNotificationItemReader reader = new KeyspaceNotificationItemReader(this.client, (RedisCodec)CodecUtils.STRING_CODEC);
        reader.open(new ExecutionContext());
        String key = "key1";
        this.commands.zadd((Object)key, 1.0, (Object)"member1");
        this.commands.zadd((Object)key, 2.0, (Object)"member2");
        this.commands.zadd((Object)key, 3.0, (Object)"member3");
        this.awaitUntil(() -> reader.getQueue().size() == 1);
        Assertions.assertEquals((Object)key, (Object)reader.read());
        reader.close();
    }

    @Test
    void readLiveType(TestInfo info) throws Exception {
        this.enableKeyspaceNotifications(this.client);
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        reader.setMode(RedisItemReader.ReaderMode.LIVE);
        reader.setKeyType(DataType.HASH);
        reader.open(new ExecutionContext());
        GeneratorItemReader gen = this.generator(100);
        this.generate(info, gen);
        reader.open(new ExecutionContext());
        List keyValues = LiveTests.readAll(reader);
        reader.close();
        Assertions.assertTrue((boolean)keyValues.stream().allMatch(v -> v.getType() == DataType.HASH));
    }

    @Test
    void readStructLive(TestInfo info) throws Exception {
        this.enableKeyspaceNotifications(this.client);
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client, (RedisCodec)ByteArrayCodec.INSTANCE);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        reader.setMode(RedisItemReader.ReaderMode.LIVE);
        reader.setNotificationQueueCapacity(10000);
        reader.open(new ExecutionContext());
        int count = 123;
        GeneratorItemReader gen = this.generator(count, DataType.HASH, DataType.STRING);
        this.generate(info, gen);
        List list = LiveTests.readAll(reader);
        Function toString = CodecUtils.toStringKeyFunction((RedisCodec)ByteArrayCodec.INSTANCE);
        Set keys = list.stream().map(KeyValue::getKey).map(toString).collect(Collectors.toSet());
        Assertions.assertEquals((int)count, (int)keys.size());
        reader.close();
    }

    @Test
    void replicateDumpLive(TestInfo info) throws Exception {
        this.enableKeyspaceNotifications(this.client);
        DumpItemReader reader = RedisItemReader.dump((AbstractRedisClient)this.client);
        DumpItemWriter writer = RedisItemWriter.dump((AbstractRedisClient)this.targetClient);
        DumpItemReader liveReader = RedisItemReader.dump((AbstractRedisClient)this.client);
        DumpItemWriter liveWriter = RedisItemWriter.dump((AbstractRedisClient)this.targetClient);
        this.assertEmpty(this.replicateLive(info, reader, writer, liveReader, liveWriter));
    }

    @Test
    void replicateStructLive(TestInfo info) throws Exception {
        this.enableKeyspaceNotifications(this.client);
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        StructItemWriter writer = RedisItemWriter.struct((AbstractRedisClient)this.targetClient);
        StructItemReader liveReader = RedisItemReader.struct((AbstractRedisClient)this.client);
        StructItemWriter liveWriter = RedisItemWriter.struct((AbstractRedisClient)this.targetClient);
        this.assertEmpty(this.replicateLive(info, reader, writer, liveReader, liveWriter));
    }

    @Test
    void replicateDumpLiveOnly(TestInfo info) throws Exception {
        this.enableKeyspaceNotifications(this.client);
        DumpItemReader reader = RedisItemReader.dump((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        reader.setMode(RedisItemReader.ReaderMode.LIVE);
        reader.setNotificationQueueCapacity(100000);
        DumpItemWriter writer = RedisItemWriter.dump((AbstractRedisClient)this.targetClient);
        Executors.newSingleThreadScheduledExecutor().execute(() -> {
            this.awaitUntil(() -> ((DumpItemReader)reader).isOpen());
            GeneratorItemReader gen = this.generator(100, DataType.HASH, DataType.LIST, DataType.SET, DataType.STRING, DataType.ZSET);
            try {
                this.generate(info, gen);
            }
            catch (JobExecutionException e) {
                throw new RuntimeException("Could not run data gen", e);
            }
        });
        this.run(info, this.flushingStep(info, reader, writer));
        this.awaitUntilFalse(() -> ((DumpItemReader)reader).isOpen());
        this.awaitUntilFalse(() -> ((DumpItemWriter)writer).isOpen());
        this.assertEmpty(this.compare(info));
    }

    @Test
    void replicateSetLiveOnly(TestInfo info) throws Exception {
        this.enableKeyspaceNotifications(this.client);
        String key = "myset";
        this.commands.sadd((Object)key, (Object[])new String[]{"1", "2", "3", "4", "5"});
        StructItemReader reader = RedisItemReader.struct((AbstractRedisClient)this.client);
        this.configureReader(info, (RedisItemReader<?, ?, ?>)reader);
        reader.setMode(RedisItemReader.ReaderMode.LIVE);
        reader.setNotificationQueueCapacity(100);
        StructItemWriter writer = RedisItemWriter.struct((AbstractRedisClient)this.targetClient);
        Executors.newSingleThreadScheduledExecutor().execute(() -> {
            this.awaitUntil(() -> ((StructItemReader)reader).isOpen());
            this.awaitUntil(() -> ((StructItemWriter)writer).isOpen());
            this.commands.srem((Object)key, (Object[])new String[]{"5"});
        });
        this.run(info, this.flushingStep(info, reader, writer));
        this.awaitUntilFalse(() -> ((StructItemReader)reader).isOpen());
        this.awaitUntilFalse(() -> ((StructItemWriter)writer).isOpen());
        Assertions.assertEquals((Object)this.commands.smembers((Object)key), (Object)this.targetCommands.smembers((Object)key));
    }
}

