/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import io.streamnative.pulsar.handlers.kop.TopicOwnershipListener;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NamespaceBundleOwnershipListenerImpl
implements NamespaceBundleOwnershipListener {
    private static final Logger log = LoggerFactory.getLogger(NamespaceBundleOwnershipListenerImpl.class);
    private final List<TopicOwnershipListener> topicOwnershipListeners = new CopyOnWriteArrayList<TopicOwnershipListener>();
    private final NamespaceService namespaceService;
    private final String brokerUrl;

    public void addTopicOwnershipListener(TopicOwnershipListener listener) {
        this.topicOwnershipListeners.add(listener);
        this.namespaceService.getOwnedServiceUnits().stream().filter(this).forEach(this::onLoad);
    }

    public void onLoad(NamespaceBundle bundle) {
        log.info("[{}] Load bundle: {}", (Object)this.brokerUrl, (Object)bundle);
        ((CompletableFuture)this.getOwnedPersistentTopicList(bundle).thenAccept(topics -> this.topicOwnershipListeners.forEach(listener -> {
            if (!listener.test(bundle.getNamespaceObject())) {
                return;
            }
            topics.forEach(topic -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Trigger load callback for {}", new Object[]{this.brokerUrl, listener.name(), topic});
                }
                listener.whenLoad(TopicName.get((String)topic));
            });
        }))).exceptionally(ex -> {
            log.error("[{}] Failed to get owned topic list of {}", new Object[]{this.brokerUrl, bundle, ex});
            return null;
        });
    }

    public void unLoad(NamespaceBundle bundle) {
        log.info("[{}] Unload bundle: {}", (Object)this.brokerUrl, (Object)bundle);
        ((CompletableFuture)this.getOwnedPersistentTopicList(bundle).thenAccept(topics -> this.topicOwnershipListeners.forEach(listener -> {
            if (!listener.test(bundle.getNamespaceObject())) {
                return;
            }
            topics.forEach(topic -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Trigger unload callback for {}", new Object[]{this.brokerUrl, listener.name(), topic});
                }
                listener.whenUnload(TopicName.get((String)topic));
            });
        }))).exceptionally(ex -> {
            log.error("[{}] Failed to get owned topic list of {}", new Object[]{this.brokerUrl, bundle, ex});
            return null;
        });
    }

    public boolean test(NamespaceBundle bundle) {
        return true;
    }

    private CompletableFuture<List<String>> getOwnedPersistentTopicList(NamespaceBundle bundle) {
        NamespaceName namespaceName = bundle.getNamespaceObject();
        CompletionStage topicsFuture = this.namespaceService.getListOfPersistentTopics(namespaceName).thenApply(topics -> topics.stream().filter(topic -> bundle.includes(TopicName.get((String)topic))).collect(Collectors.toList()));
        CompletionStage partitionsFuture = this.namespaceService.getPartitions(namespaceName, TopicDomain.persistent).thenApply(topics -> topics.stream().filter(topic -> bundle.includes(TopicName.get((String)topic))).collect(Collectors.toList()));
        return ((CompletableFuture)topicsFuture).thenCombine(partitionsFuture, (topics, partitions) -> {
            for (String partition : partitions) {
                if (topics.contains(partition)) continue;
                topics.add(partition);
            }
            return topics;
        });
    }

    public NamespaceBundleOwnershipListenerImpl(NamespaceService namespaceService, String brokerUrl) {
        this.namespaceService = namespaceService;
        this.brokerUrl = brokerUrl;
    }
}

