/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import io.streamnative.pulsar.handlers.kop.AdvertisedListener;
import io.streamnative.pulsar.handlers.kop.EndPoint;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.LookupClient;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KopBrokerLookupManager {
    private static final Logger log = LoggerFactory.getLogger(KopBrokerLookupManager.class);
    private final LookupClient lookupClient;
    private final MetadataStoreCacheLoader metadataStoreCacheLoader;
    private final String selfAdvertisedListeners;
    private final PulsarService pulsar;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> LOOKUP_CACHE = new ConcurrentHashMap();

    public KopBrokerLookupManager(KafkaServiceConfiguration conf, PulsarService pulsarService, LookupClient lookupClient) throws Exception {
        this.pulsar = pulsarService;
        this.lookupClient = lookupClient;
        this.metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarService.getPulsarResources(), conf.getBrokerLookupTimeoutMs());
        this.selfAdvertisedListeners = conf.getKafkaAdvertisedListeners();
    }

    public CompletableFuture<Optional<InetSocketAddress>> findBroker(String topic, @Nullable EndPoint advertisedEndPoint) {
        return this.getTopicBroker(topic).thenApply(internalListenerAddress -> {
            if (internalListenerAddress == null) {
                log.error("[{}] failed get pulsar address, returned null.", (Object)topic);
                KopBrokerLookupManager.removeTopicManagerCache(topic);
                return Optional.empty();
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Found broker's internal listener address: {}", (Object)topic, internalListenerAddress);
            }
            try {
                String listener = this.getAdvertisedListener((InetSocketAddress)internalListenerAddress, topic, advertisedEndPoint);
                if (listener == null) {
                    log.error("Failed to find the advertised listener for {} ", (Object)topic);
                    KopBrokerLookupManager.removeTopicManagerCache(topic);
                    return Optional.empty();
                }
                if (log.isDebugEnabled()) {
                    log.debug("Found listener {} for topic {}", (Object)listener, (Object)topic);
                }
                AdvertisedListener advertisedListener = AdvertisedListener.create(listener);
                return Optional.of(new InetSocketAddress(advertisedListener.getHostname(), advertisedListener.getPort()));
            }
            catch (IllegalStateException | NumberFormatException e) {
                log.error("Failed to find the advertised listener: {}", (Object)e.getMessage());
                KopBrokerLookupManager.removeTopicManagerCache(topic);
                return Optional.empty();
            }
        });
    }

    public CompletableFuture<InetSocketAddress> getTopicBroker(String topicName) {
        CompletableFuture<InetSocketAddress> future;
        if (this.closed.get()) {
            if (log.isDebugEnabled()) {
                log.debug("Return null for getTopicBroker({}) since channel closing", (Object)topicName);
            }
            return CompletableFuture.completedFuture(null);
        }
        if (log.isDebugEnabled()) {
            log.debug("Handle Lookup for topic {}", (Object)topicName);
        }
        return (future = LOOKUP_CACHE.get(topicName)) != null ? future : this.lookupBroker(topicName);
    }

    private CompletableFuture<InetSocketAddress> lookupBroker(String topic) {
        if (this.closed.get()) {
            if (log.isDebugEnabled()) {
                log.debug("Return null for getTopic({}) since channel closing", (Object)topic);
            }
            return CompletableFuture.completedFuture(null);
        }
        return this.lookupClient.getBrokerAddress(TopicName.get((String)topic));
    }

    public CompletableFuture<Boolean> isTopicExists(String topic) {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        TopicName topicName = TopicName.get((String)topic);
        this.pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get((String)topic)).whenComplete((metadata, ex) -> {
            if (ex != null) {
                log.error("Fetch partitioned topic metadata has exception.", ex);
                future.complete(true);
                return;
            }
            if (metadata.partitions == 0) {
                ((CompletableFuture)this.internalCheckTopicExists(topicName).thenAccept(future::complete)).exceptionally(throwable -> {
                    log.error("Check topic exists has exception.", throwable);
                    future.complete(true);
                    return null;
                });
                return;
            }
            future.complete(true);
        });
        return future;
    }

    protected CompletableFuture<Boolean> internalCheckTopicExists(TopicName topicName) {
        return this.pulsar.getNamespaceService().checkTopicExists(topicName);
    }

    private String getAdvertisedListener(InetSocketAddress internalListenerAddress, String topic, @Nullable EndPoint advertisedEndPoint) {
        List availableBrokers = this.metadataStoreCacheLoader.getAvailableBrokers();
        if (log.isDebugEnabled()) {
            availableBrokers.forEach(loadManagerReport -> log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, pulsarUrlTls: {}, webUrl: {}, webUrlTls: {} kafka: {}", new Object[]{topic, loadManagerReport.getPulsarServiceUrl(), loadManagerReport.getPulsarServiceUrlTls(), loadManagerReport.getWebServiceUrl(), loadManagerReport.getWebServiceUrlTls(), loadManagerReport.getProtocol("kafka")}));
        }
        String hostAndPort = internalListenerAddress.getHostName() + ":" + internalListenerAddress.getPort();
        Optional<LoadManagerReport> serviceLookupData = availableBrokers.stream().filter(loadManagerReport -> KopBrokerLookupManager.lookupDataContainsAddress((ServiceLookupData)loadManagerReport, hostAndPort)).findAny();
        if (!serviceLookupData.isPresent()) {
            log.error("No node for broker {} under loadBalance", (Object)internalListenerAddress);
            return null;
        }
        return serviceLookupData.get().getProtocol("kafka").map(kafkaAdvertisedListeners -> {
            if (kafkaAdvertisedListeners.equals(this.selfAdvertisedListeners)) {
                LOOKUP_CACHE.put(topic, CompletableFuture.completedFuture(internalListenerAddress));
            }
            return Optional.ofNullable(advertisedEndPoint).map(endPoint -> EndPoint.findListener(kafkaAdvertisedListeners, endPoint.getListenerName())).orElse(EndPoint.findFirstListener(kafkaAdvertisedListeners));
        }).orElseThrow(() -> new IllegalStateException("No kafkaAdvertisedListeners found in broker " + internalListenerAddress));
    }

    private static boolean lookupDataContainsAddress(ServiceLookupData data, String hostAndPort) {
        return StringUtils.endsWith((CharSequence)data.getPulsarServiceUrl(), (CharSequence)hostAndPort) || StringUtils.endsWith((CharSequence)data.getPulsarServiceUrlTls(), (CharSequence)hostAndPort);
    }

    public static void removeTopicManagerCache(String topicName) {
        LOOKUP_CACHE.remove(topicName);
    }

    public static void clear() {
        LOOKUP_CACHE.clear();
    }

    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("Closing KopBrokerLookupManager");
            }
            return;
        }
        KopBrokerLookupManager.clear();
        try {
            this.metadataStoreCacheLoader.close();
        }
        catch (IOException e) {
            log.error("Close metadataStoreCacheLoader failed.", (Throwable)e);
        }
    }
}

