/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.kafka.precheck;

import com.alibaba.ververica.connectors.common.precheck.SinkConnectivityChecker;
import com.alibaba.ververica.connectors.common.precheck.SourceConnectivityChecker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.TopicListing;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectivityChecker
implements SourceConnectivityChecker,
SinkConnectivityChecker {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectivityChecker.class);
    public static final String PROPERTIES_PREFIX = "properties.";
    private final AdminClient adminClient;
    private final KafkaTopicsDescriptor topicsDescriptor;

    KafkaConnectivityChecker(Map<String, String> tableOptions) {
        this.validateOptions(tableOptions, "checking connectivity", KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS.key());
        this.adminClient = this.getAdminClient(tableOptions, "Kafka Connectivity Checker");
        Configuration tableConfigs = Configuration.fromMap(tableOptions);
        List topics = tableConfigs.getOptional(KafkaConnectorOptions.TOPIC).orElse(null);
        Pattern topicPattern = tableConfigs.getOptional(KafkaConnectorOptions.TOPIC_PATTERN).map(Pattern::compile).orElse(null);
        this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
    }

    @Override
    public CompletableFuture<Void> checkConnectivity(long timeoutMs) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Integer safeTimeoutMs = timeoutMs > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)timeoutMs;
        this.adminClient.listTopics(new ListTopicsOptions().timeoutMs(safeTimeoutMs)).listings().whenComplete((topicListings, exception) -> {
            if (exception != null) {
                future.completeExceptionally((Throwable)exception);
            } else {
                this.checkTopicListing((Collection<TopicListing>)topicListings, this.topicsDescriptor, future);
            }
        });
        return future;
    }

    private AdminClient getAdminClient(Map<String, String> tableOptions, String clientId) {
        Properties adminClientProps = new Properties();
        adminClientProps.putAll((Map<?, ?>)this.getKafkaProperties(tableOptions));
        adminClientProps.setProperty("client.id", clientId);
        return AdminClient.create(adminClientProps);
    }

    private Properties getKafkaProperties(Map<String, String> tableOptions) {
        Properties kafkaProperties = new Properties();
        if (KafkaConnectivityChecker.hasKafkaClientProperties(tableOptions)) {
            tableOptions.keySet().stream().filter(key -> key.startsWith(PROPERTIES_PREFIX)).forEach(key -> {
                String value = (String)tableOptions.get(key);
                String subKey = key.substring(PROPERTIES_PREFIX.length());
                kafkaProperties.put(subKey, value);
            });
        }
        return kafkaProperties;
    }

    private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
        return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
    }

    private void checkTopicListing(Collection<TopicListing> topicListings, KafkaTopicsDescriptor topicsDescriptor, CompletableFuture<Void> future) {
        LOG.trace("Topics retrieved from Kafka cluster: {}", topicListings);
        List matchedTopics = topicListings.stream().map(TopicListing::name).filter(topicsDescriptor::isMatchingTopic).collect(Collectors.toList());
        if (topicsDescriptor.isFixedTopics()) {
            if (matchedTopics.size() < topicsDescriptor.getFixedTopics().size()) {
                ArrayList<String> missingTopics = new ArrayList<String>(topicsDescriptor.getFixedTopics());
                missingTopics.removeAll(matchedTopics);
                future.completeExceptionally(new IllegalArgumentException("Unknown topic names: " + String.join((CharSequence)", ", missingTopics)));
                return;
            }
            LOG.debug("Successfully retrieved topics with fixed topic mode: {}", matchedTopics);
        } else {
            LOG.debug("Successfully retrieved topics with topic pattern mode: {}", matchedTopics);
        }
        future.complete(null);
    }

    private void validateOptions(Map<String, String> tableOptions, String operation, String ... requiredKeys) {
        ArrayList<String> missingKeys = new ArrayList<String>();
        for (String requiredKey : requiredKeys) {
            if (tableOptions.containsKey(requiredKey)) continue;
            missingKeys.add(requiredKey);
        }
        if (!missingKeys.isEmpty()) {
            throw new IllegalArgumentException(String.format("Missing these keys for %s: %s", operation, String.join((CharSequence)", ", missingKeys)));
        }
    }

    @Override
    public void close() {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }
}

