/*
 * Decompiled with CFR 0.152.
 */
package net.pincette.jes.test;

import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.json.JsonArray;
import javax.json.JsonObject;
import net.pincette.function.SideEffect;
import net.pincette.jes.test.Report;
import net.pincette.jes.util.JsonSerializer;
import net.pincette.jes.util.Kafka;
import net.pincette.jes.util.Streams;
import net.pincette.json.JsonUtil;
import net.pincette.util.Builder;
import net.pincette.util.Collections;
import net.pincette.util.Pair;
import net.pincette.util.StreamUtil;
import net.pincette.util.Util;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

public class Test {
    private static final String APPLICATION_ID = "application.id";
    private static final Set<String> TECHNICAL = Collections.set("_corr", "_jwt", "_seq", "_timestamp");

    private Test() {
    }

    private static void addResult(List<JsonObject> results, List<Pair<String, JsonObject>> commands, JsonObject reply) {
        Optional.ofNullable(reply.getString("_corr", null)).flatMap(corr -> Test.getIndex(commands, corr)).ifPresent(index -> results.set((int)index, reply));
    }

    private static Map<String, Object> asMap(Properties properties) {
        return properties.entrySet().stream().collect(Collectors.toMap(e -> (String)e.getKey(), Map.Entry::getValue));
    }

    private static void adjustApplicationId(Properties kafkaConfig) {
        Optional.ofNullable(kafkaConfig.getProperty(APPLICATION_ID)).filter(id -> !id.endsWith("-test")).ifPresent(id -> kafkaConfig.setProperty(APPLICATION_ID, id + "-test"));
    }

    private static void collectReplies(StreamsBuilder builder, String type, String environment, Map<String, JsonObject> perCorr, Consumer<JsonObject> reply) {
        KStream<String, JsonObject> stream = builder.stream(type + "-reply-" + environment);
        stream.filter((k, v) -> Optional.ofNullable(v.getString("_corr", null)).filter(perCorr::containsKey).isPresent()).mapValues(v -> SideEffect.run(() -> reply.accept((JsonObject)v)).andThenGet(() -> v));
    }

    private static Stream<JsonObject> commandStream(List<Pair<String, JsonObject>> commands) {
        return commands.stream().map(pair -> (JsonObject)pair.second);
    }

    private static boolean compareResults(List<JsonObject> results, List<JsonObject> replies, List<Pair<String, JsonObject>> commands, Report report) {
        return StreamUtil.rangeExclusive(0, results.size()).map(i -> Pair.pair(i, JsonUtil.createDiff(Test.removeTechnical((JsonObject)replies.get((int)i)), Test.removeTechnical((JsonObject)results.get((int)i))).toJsonArray())).map(pair -> SideEffect.run(() -> report.report((String)((Pair)commands.get((int)((Integer)pair.first).intValue())).first, (JsonObject)((Pair)commands.get((int)((Integer)pair.first).intValue())).second, (JsonObject)replies.get((Integer)pair.first), (JsonArray)pair.second)).andThenGet(() -> (JsonArray)pair.second)).reduce(true, (result, diff) -> result != false && diff.isEmpty(), (r1, r2) -> r1);
    }

    private static boolean complete(List<JsonObject> results) {
        return results.stream().noneMatch(Objects::isNull);
    }

    private static JsonObject completeCommand(JsonObject json) {
        return Builder.create(() -> JsonUtil.createObjectBuilder(json)).update(b -> b.add("_corr", UUID.randomUUID().toString())).updateIf(b -> !json.containsKey("_jwt"), b -> b.add("_jwt", JsonUtil.createObjectBuilder().add("sub", "system"))).build().build();
    }

    private static Optional<Integer> getIndex(List<Pair<String, JsonObject>> commands, String corr) {
        return StreamUtil.zip(commands.stream(), StreamUtil.rangeExclusive(0, commands.size())).filter(pair -> ((JsonObject)((Pair)pair.first).second).getString("_corr").equals(corr)).findFirst().map(pair -> (Integer)pair.second);
    }

    private static List<Pair<String, JsonObject>> loadCommands(Path directory) {
        return Test.loadJson(directory.resolve("commands")).map(pair -> Pair.pair((String)pair.first, Test.completeCommand((JsonObject)pair.second))).collect(Collectors.toList());
    }

    private static Stream<Pair<String, JsonObject>> loadJson(Path directory) {
        return Util.tryToGetRethrow(() -> Files.list(directory)).orElseGet(Stream::empty).filter(path -> path.toFile().isFile()).filter(Files::isReadable).sorted(Comparator.comparing(Path::getFileName)).map(path -> Pair.pair(path.getFileName().toString(), Test.parse(path)));
    }

    private static List<JsonObject> loadReplies(Path directory) {
        return Test.loadJson(directory.resolve("replies")).map(pair -> (JsonObject)pair.second).collect(Collectors.toList());
    }

    private static JsonObject parse(Path path) {
        return Util.tryToGetRethrow(() -> JsonUtil.createReader(new FileInputStream(path.toFile())).readObject()).orElse(null);
    }

    private static Map<String, JsonObject> perCorrelationId(Stream<JsonObject> commands) {
        return commands.collect(Collectors.toMap(c -> c.getString("_corr"), c -> c));
    }

    public static boolean run(Path directory, Properties kafkaConfig, String environment, Report report) {
        return Test.run(directory, kafkaConfig, environment, report, null);
    }

    public static boolean run(Path directory, Properties kafkaConfig, String environment, Report report, UnaryOperator<StreamsBuilder> buildBefore) {
        List<Pair<String, JsonObject>> commands = Test.loadCommands(directory);
        List<JsonObject> replies = Test.loadReplies(directory);
        if (commands.size() != replies.size()) {
            Logger.getGlobal().log(Level.SEVERE, "There should be as many replies as there are commands.");
            return false;
        }
        Test.adjustApplicationId(kafkaConfig);
        StreamsBuilder builder = new StreamsBuilder();
        boolean[] error = new boolean[1];
        CountDownLatch latch = new CountDownLatch(1);
        Map<String, JsonObject> perCorr = Test.perCorrelationId(Test.commandStream(commands));
        ArrayList<JsonObject> results = new ArrayList<JsonObject>(Arrays.asList(new JsonObject[replies.size()]));
        KafkaStreams[] streams = new KafkaStreams[1];
        if (buildBefore != null) {
            buildBefore.apply(builder);
        }
        Test.types(Test.commandStream(commands)).forEach(type -> Test.collectReplies(builder, type, environment, perCorr, reply -> {
            Test.addResult(results, commands, reply);
            Optional.ofNullable(streams[0]).filter(s -> Test.complete(results)).ifPresent(KafkaStreams::close);
        }));
        streams[0] = new KafkaStreams(builder.build(), Streams.streamsConfig(kafkaConfig));
        streams[0].setStateListener((newState, oldState) -> Optional.of(newState).filter(state -> state == KafkaStreams.State.ERROR || state == KafkaStreams.State.PENDING_SHUTDOWN).ifPresent(state -> {
            if (state == KafkaStreams.State.ERROR) {
                error[0] = true;
            }
            latch.countDown();
        }));
        streams[0].start();
        Test.sendCommands(Test.commandStream(commands), kafkaConfig, environment);
        Util.tryToDoRethrow(latch::await);
        return !error[0] && Test.compareResults(results, replies, commands, report);
    }

    private static JsonObject removeTechnical(JsonObject json) {
        return JsonUtil.copy(json, JsonUtil.createObjectBuilder(), field -> !TECHNICAL.contains(field)).build();
    }

    private static CompletionStage<Boolean> sendCommands(Stream<JsonObject> commands, Properties kafkaConfig, String environment) {
        return Util.tryToGetWithRethrow(() -> Kafka.createReliableProducer(Test.asMap(kafkaConfig), new StringSerializer(), new JsonSerializer()), producer -> StreamUtil.composeAsyncStream(commands.map(command -> Kafka.send(producer, new ProducerRecord<String, JsonObject>(command.getString("_type") + "-command-" + environment, command.getString("_id"), (JsonObject)command)))).thenApply(results -> results.reduce((r1, r2) -> r1 != false && r2 != false).orElse(true))).orElseGet(() -> CompletableFuture.completedFuture(false));
    }

    private static Set<String> types(Stream<JsonObject> commands) {
        return commands.map(json -> Optional.ofNullable(json.getString("_type", null)).orElse(null)).filter(Objects::nonNull).collect(Collectors.toSet());
    }
}

