/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.streams.runtime;

import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.jboss.logging.Logger;

public class KafkaStreamsTopologyManager {
    private static final Logger LOGGER = Logger.getLogger((String)KafkaStreamsTopologyManager.class.getName());
    private final Admin adminClient;
    private final List<String> sourceTopics;
    private final List<Pattern> sourcePatterns;
    private final Duration topicsTimeout;
    private volatile boolean closed = false;

    public KafkaStreamsTopologyManager(Admin adminClient, Topology topology, KafkaStreamsRuntimeConfig runtimeConfig) {
        this.adminClient = adminClient;
        this.topicsTimeout = runtimeConfig.topicsTimeout();
        if (this.isTopicsCheckEnabled()) {
            if (runtimeConfig.topics().isEmpty() && runtimeConfig.topicPatterns().isEmpty()) {
                HashSet<String> topics = new HashSet<String>();
                HashSet<Pattern> patterns = new HashSet<Pattern>();
                KafkaStreamsTopologyManager.extractSources(topology, topics, patterns);
                this.sourceTopics = new ArrayList<String>(topics);
                this.sourcePatterns = new ArrayList<Pattern>(patterns);
                LOGGER.infof("Kafka Streams will wait for topics: %s and topics matching patterns: %s to be created", this.sourceTopics, this.sourcePatterns);
            } else {
                this.sourceTopics = runtimeConfig.topics().orElse(Collections.emptyList());
                this.sourcePatterns = runtimeConfig.topicPatterns().orElse(Collections.emptyList()).stream().map(Pattern::compile).toList();
            }
            if (this.sourceTopics.isEmpty() && this.sourcePatterns.isEmpty()) {
                throw new IllegalArgumentException("No topics or topic patterns specified; cannot wait for topics to be created, in order to disable topics creation check set `quarkus.kafka-streams.topics-check-timeout=0`");
            }
        } else {
            LOGGER.infof("Kafka Streams will not wait for topics to be created", new Object[0]);
            this.sourceTopics = Collections.emptyList();
            this.sourcePatterns = Collections.emptyList();
        }
    }

    public boolean isTopicsCheckEnabled() {
        return this.topicsTimeout.compareTo(Duration.ZERO) > 0;
    }

    void close() {
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public static void extractSources(Topology topo, Set<String> topics, Set<Pattern> patterns) {
        HashSet<String> sinkTopics = new HashSet<String>();
        TopologyDescription topologyDescription = topo.describe();
        for (TopologyDescription.GlobalStore globalStore : topologyDescription.globalStores()) {
            TopologyDescription.Source source = globalStore.source();
            if (source.topicPattern() != null) {
                patterns.add(source.topicPattern());
            }
            if (source.topicSet() == null) continue;
            topics.addAll(source.topicSet());
        }
        for (TopologyDescription.Subtopology subtopology : topologyDescription.subtopologies()) {
            for (TopologyDescription.Node node : subtopology.nodes()) {
                if (node instanceof TopologyDescription.Sink) {
                    TopologyDescription.Sink sink = (TopologyDescription.Sink)node;
                    if (sink.topic() == null) continue;
                    sinkTopics.add(sink.topic());
                    continue;
                }
                if (!(node instanceof TopologyDescription.Source)) continue;
                TopologyDescription.Source source = (TopologyDescription.Source)node;
                if (source.topicPattern() != null) {
                    patterns.add(source.topicPattern());
                }
                if (source.topicSet() == null) continue;
                topics.addAll(source.topicSet());
            }
        }
        topics.removeAll(sinkTopics);
    }

    public List<String> getSourceTopics() {
        return this.sourceTopics;
    }

    public List<Pattern> getSourcePatterns() {
        return this.sourcePatterns;
    }

    public Set<String> getMissingTopics() throws InterruptedException {
        if (!this.isTopicsCheckEnabled()) {
            return Collections.emptySet();
        }
        LinkedHashSet<String> missing = new LinkedHashSet<String>(this.sourceTopics);
        try {
            ListTopicsResult topics = this.adminClient.listTopics();
            Set existingTopics = (Set)topics.names().get(this.topicsTimeout.toMillis(), TimeUnit.MILLISECONDS);
            missing.removeAll(existingTopics);
            missing.addAll(this.sourcePatterns.stream().filter(p -> existingTopics.stream().noneMatch(p.asPredicate())).map(Pattern::pattern).toList());
        }
        catch (ExecutionException | TimeoutException e) {
            LOGGER.error((Object)"Failed to get topic names from broker", (Throwable)e);
        }
        return missing;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForTopicsToBeCreated() throws InterruptedException {
        if (!this.isTopicsCheckEnabled()) {
            return;
        }
        HashSet<String> lastMissingTopics = null;
        while (!this.closed) {
            try {
                ListTopicsResult topics = this.adminClient.listTopics();
                Set existingTopics = (Set)topics.names().get(this.topicsTimeout.toMillis(), TimeUnit.MILLISECONDS);
                if (existingTopics.containsAll(this.sourceTopics) && this.sourcePatterns.stream().allMatch(p -> existingTopics.stream().anyMatch(p.asPredicate()))) {
                    LOGGER.debugf("All expected topics %s and topics matching patterns %s ", this.sourceTopics, this.sourcePatterns);
                    return;
                }
                HashSet<String> missingTopics = new HashSet<String>(this.sourceTopics);
                missingTopics.removeAll(existingTopics);
                if (missingTopics.equals(lastMissingTopics)) {
                    LOGGER.debug((Object)("Waiting for topic(s) to be created: " + String.valueOf(missingTopics)));
                    continue;
                }
                LOGGER.warn((Object)("Waiting for topic(s) to be created: " + String.valueOf(missingTopics)));
                lastMissingTopics = missingTopics;
            }
            catch (ExecutionException | TimeoutException e) {
                LOGGER.error((Object)"Failed to get topic names from broker", (Throwable)e);
            }
            finally {
                Thread.sleep(1000L);
            }
        }
    }
}

