/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.test;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.common.KeyComparison;
import com.redis.spring.batch.common.KeyComparisonItemReader;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.common.Range;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.reader.KeyValueItemReader;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.test.AbstractTestBase;
import com.redis.spring.batch.test.SimpleTestInfo;
import com.redis.testcontainers.RedisServer;
import io.lettuce.core.AbstractRedisClient;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

public abstract class AbstractTargetTestBase
extends AbstractTestBase {
    protected AbstractRedisClient targetClient;
    protected StatefulRedisModulesConnection<String, String> targetConnection;
    protected RedisModulesCommands<String, String> targetCommands;

    protected abstract RedisServer getTargetRedisServer();

    @BeforeAll
    void targetSetup() throws Exception {
        this.getTargetRedisServer().start();
        this.targetClient = this.client(this.getTargetRedisServer());
        this.targetConnection = RedisModulesUtils.connection((AbstractRedisClient)this.targetClient);
        this.targetCommands = this.targetConnection.sync();
    }

    @AfterAll
    void targetTeardown() {
        this.targetConnection.close();
        this.targetClient.shutdown();
        this.targetClient.getResources().shutdown();
        this.getTargetRedisServer().close();
    }

    @BeforeEach
    void targetFlushAll() {
        this.targetCommands.flushall();
        this.awaitUntil(() -> this.targetCommands.dbsize().equals(0L));
    }

    protected List<KeyComparison> compare(TestInfo info) throws Exception {
        if (this.commands.dbsize().equals(0L)) {
            Assertions.fail((String)"Source database is empty");
        }
        KeyComparisonItemReader reader = this.comparisonReader(AbstractTargetTestBase.testInfo(info, "compare-reader"));
        reader.open(new ExecutionContext());
        List comparisons = AbstractTargetTestBase.readAll(reader);
        reader.close();
        Assertions.assertFalse((boolean)comparisons.isEmpty());
        return comparisons.stream().filter(c -> c.getStatus() != KeyComparison.Status.OK).collect(Collectors.toList());
    }

    protected void logDiffs(Collection<KeyComparison> diffs) {
        for (KeyComparison diff : diffs) {
            this.log.error("{}: {} {}", new Object[]{diff.getStatus(), diff.getSource().getKey(), diff.getSource().getType()});
        }
    }

    protected KeyComparisonItemReader comparisonReader(TestInfo info) throws Exception {
        StructItemReader sourceReader = RedisItemReader.struct((AbstractRedisClient)this.client);
        StructItemReader targetReader = RedisItemReader.struct((AbstractRedisClient)this.targetClient);
        KeyComparisonItemReader reader = new KeyComparisonItemReader((KeyValueItemReader)sourceReader, (KeyValueItemReader)targetReader);
        reader.setName(AbstractTargetTestBase.name(info));
        reader.setTtlTolerance(Duration.ofMillis(100L));
        return reader;
    }

    protected <K, V, T extends KeyValue<K>> List<KeyComparison> replicateLive(TestInfo info, RedisItemReader<K, V, T> reader, RedisItemWriter<K, V, T> writer, RedisItemReader<K, V, T> liveReader, RedisItemWriter<K, V, T> liveWriter) throws Exception {
        this.configureReader(new SimpleTestInfo(info, "reader"), reader);
        this.configureReader(new SimpleTestInfo(info, "liveReader"), liveReader);
        liveReader.setMode(RedisItemReader.ReaderMode.LIVE);
        GeneratorItemReader gen = this.generator(300);
        this.generate(new SimpleTestInfo(info, "generate"), gen);
        TaskletStep step = this.faultTolerant(this.step(new SimpleTestInfo(info, "step"), reader, writer)).build();
        SimpleFlow flow = (SimpleFlow)new FlowBuilder(AbstractTargetTestBase.name(new SimpleTestInfo(info, "snapshotFlow"))).start((Step)step).build();
        TaskletStep liveStep = this.faultTolerant(this.flushingStep(new SimpleTestInfo(info, "liveStep"), liveReader, liveWriter)).build();
        SimpleFlow liveFlow = (SimpleFlow)new FlowBuilder(AbstractTargetTestBase.name(new SimpleTestInfo(info, "liveFlow"))).start((Step)liveStep).build();
        Job job = this.job(info).start((Flow)new FlowBuilder(AbstractTargetTestBase.name(new SimpleTestInfo(info, "flow"))).split((TaskExecutor)new SimpleAsyncTaskExecutor()).add(new Flow[]{liveFlow, flow}).build()).build().build();
        Executors.newSingleThreadScheduledExecutor().execute(() -> {
            this.awaitUntil(() -> ((RedisItemReader)liveReader).isOpen());
            this.awaitUntil(() -> ((RedisItemWriter)liveWriter).isOpen());
            GeneratorItemReader liveGen = this.generator(700, DataType.HASH, DataType.LIST, DataType.SET, DataType.STRING, DataType.ZSET);
            liveGen.setExpiration(Range.of((int)100));
            liveGen.setKeyRange(Range.from((int)300));
            try {
                this.generate(AbstractTargetTestBase.testInfo(info, "generateLive"), liveGen);
            }
            catch (JobExecutionException e) {
                throw new RuntimeException("Could not execute data gen");
            }
        });
        this.run(job);
        this.awaitUntilFalse(() -> reader.isOpen());
        this.awaitUntilFalse(() -> writer.isOpen());
        this.awaitUntilFalse(() -> liveReader.isOpen());
        this.awaitUntilFalse(() -> liveWriter.isOpen());
        return this.compare(info);
    }
}

