/*
 * Decompiled with CFR 0.152.
 */
package com.bakdata.kafka.util;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.jooq.lambda.Seq;

public class TopologyInformation {
    private static final String CHANGELOG_SUFFIX = "-changelog";
    private static final String REPARTITION_SUFFIX = "-repartition";
    private static final Collection<String> PSEUDO_TOPIC_SUFFIXES = Set.of("-pk", "-fk", "-vh");
    private final String streamsId;
    private final Collection<TopologyDescription.Node> nodes;

    public TopologyInformation(Topology topology, String streamsId) {
        this.nodes = TopologyInformation.getNodes(topology);
        this.streamsId = streamsId;
    }

    private static List<TopologyDescription.Node> getNodes(Topology topology) {
        return topology.describe().subtopologies().stream().flatMap(subtopology -> subtopology.nodes().stream()).collect(Collectors.toList());
    }

    private static Stream<String> getAllSources(Collection<TopologyDescription.Node> nodes) {
        return nodes.stream().filter(node -> node instanceof TopologyDescription.Source).map(node -> (TopologyDescription.Source)node).map(TopologyDescription.Source::topicSet).flatMap(Collection::stream);
    }

    private static Stream<String> getAllSinks(Collection<TopologyDescription.Node> nodes) {
        return nodes.stream().filter(node -> node instanceof TopologyDescription.Sink).map(node -> (TopologyDescription.Sink)node).map(TopologyDescription.Sink::topic);
    }

    private static Stream<String> getAllStores(Collection<TopologyDescription.Node> nodes) {
        return nodes.stream().filter(node -> node instanceof TopologyDescription.Processor).map(node -> (TopologyDescription.Processor)node).flatMap(processor -> processor.stores().stream());
    }

    private static Stream<String> createPseudoTopics(String topic) {
        if (topic.contains("FK-JOIN-SUBSCRIPTION-REGISTRATION")) {
            return PSEUDO_TOPIC_SUFFIXES.stream().map(suffix -> String.format("%s%s", topic, suffix));
        }
        return Stream.empty();
    }

    public List<String> getInternalTopics() {
        Stream<String> internalSinks = this.getInternalSinks();
        Stream<String> changelogTopics = this.getChangelogTopics();
        Stream<String> repartitionTopics = this.getRepartitionTopics();
        return Stream.concat(Stream.concat(internalSinks, changelogTopics), repartitionTopics).collect(Collectors.toList());
    }

    public List<String> getExternalSinkTopics() {
        return TopologyInformation.getAllSinks(this.nodes).filter(this::isExternalTopic).collect(Collectors.toList());
    }

    public List<String> getExternalSourceTopics() {
        List<String> sinks = this.getExternalSinkTopics();
        return TopologyInformation.getAllSources(this.nodes).filter(this::isExternalTopic).filter(t -> !sinks.contains(t)).collect(Collectors.toList());
    }

    public List<String> getIntermediateTopics() {
        List<String> sinks = this.getExternalSinkTopics();
        return TopologyInformation.getAllSources(this.nodes).filter(this::isExternalTopic).filter(sinks::contains).collect(Collectors.toList());
    }

    private boolean isInternalTopic(String topic) {
        if (topic.startsWith("KSTREAM-") || topic.startsWith("KTABLE-")) {
            return true;
        }
        if (topic.endsWith(CHANGELOG_SUFFIX)) {
            List changelogTopics = this.getChangelogTopics().collect(Collectors.toList());
            return changelogTopics.contains(topic);
        }
        if (topic.endsWith(REPARTITION_SUFFIX)) {
            List repartitionTopics = this.getRepartitionTopics().collect(Collectors.toList());
            return repartitionTopics.contains(topic);
        }
        return false;
    }

    private boolean isExternalTopic(String topic) {
        return !this.isInternalTopic(topic);
    }

    private Stream<String> getInternalSinks() {
        return TopologyInformation.getAllSinks(this.nodes).filter(this::isInternalTopic).flatMap(topic -> Seq.of((Object)topic).concat(TopologyInformation.createPseudoTopics(topic))).map(topic -> String.format("%s-%s", this.streamsId, topic));
    }

    private Stream<String> getChangelogTopics() {
        return TopologyInformation.getAllStores(this.nodes).map(store -> String.format("%s-%s%s", this.streamsId, store, CHANGELOG_SUFFIX));
    }

    private Stream<String> getRepartitionTopics() {
        return TopologyInformation.getAllStores(this.nodes).map(store -> String.format("%s%s", store, REPARTITION_SUFFIX));
    }
}

