/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.streamnative.pulsar.handlers.kop.AdminManager;
import io.streamnative.pulsar.handlers.kop.AdvertisedListener;
import io.streamnative.pulsar.handlers.kop.BrokersChangeHandler;
import io.streamnative.pulsar.handlers.kop.DeletionTopicsHandler;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KopEventManagerStats;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.ShutdownableThread;
import io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KopEventManager {
    private static final Logger log = LoggerFactory.getLogger(KopEventManager.class);
    private static final String kopEventThreadName = "kop-event-thread";
    private static final LinkedBlockingQueue<KopEventWrapper> queue = new LinkedBlockingQueue();
    private final KopEventThread thread = new KopEventThread("kop-event-thread");
    private final Map<String, GroupCoordinator> groupCoordinatorsByTenant;
    private final AdminManager adminManager;
    private final KafkaServiceConfiguration kafkaConfig;
    private final DeletionTopicsHandler deletionTopicsHandler;
    private final BrokersChangeHandler brokersChangeHandler;
    private final MetadataStore metadataStore;
    private KopEventManagerStats eventManagerStats;
    public BiConsumer<String, Long> registerEventLatency = (eventName, createdTime) -> this.eventManagerStats.getStatsLogger().scopeLabel("kop_event", (String)eventName).getOpStatsLogger("KOP_EVENT_LATENCY").registerSuccessfulEvent(MathUtils.elapsedNanos((long)createdTime), TimeUnit.NANOSECONDS);

    public KopEventManager(AdminManager adminManager, MetadataStore metadataStore, StatsLogger statsLogger, KafkaServiceConfiguration kafkaConfig, Map<String, GroupCoordinator> groupCoordinatorsByTenant) {
        this.adminManager = adminManager;
        this.deletionTopicsHandler = new DeletionTopicsHandler(this);
        this.brokersChangeHandler = new BrokersChangeHandler(this);
        this.metadataStore = metadataStore;
        this.kafkaConfig = kafkaConfig;
        this.eventManagerStats = new KopEventManagerStats(statsLogger, queue);
        this.groupCoordinatorsByTenant = groupCoordinatorsByTenant;
    }

    public void start() {
        this.registerChildChangeHandler();
        this.thread.start();
    }

    public void close() {
        try {
            this.thread.initiateShutdown();
            this.clearAndPut(this.getShutdownEventThread());
            this.thread.awaitShutdown();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Interrupted at shutting down {}", (Object)kopEventThreadName);
        }
    }

    public void put(KopEventWrapper eventWrapper) {
        try {
            queue.put(eventWrapper);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Error put event {} to kop event queue", (Object)eventWrapper.toString(), (Object)e);
        }
    }

    public void clearAndPut(KopEventWrapper eventWrapper) {
        queue.clear();
        this.put(eventWrapper);
    }

    public void registerEventQueuedLatency(KopEventWrapper eventWrapper) {
        this.eventManagerStats.getStatsLogger().scopeLabel("kop_event", eventWrapper.kopEvent.name()).getOpStatsLogger("KOP_EVENT_QUEUED_LATENCY").registerSuccessfulEvent(MathUtils.elapsedNanos((long)eventWrapper.getCreatedTime()), TimeUnit.NANOSECONDS);
    }

    private void registerChildChangeHandler() {
        this.metadataStore.registerListener(this::handleChildChangePathNotification);
        this.metadataStore.getChildren(KopEventManager.getDeleteTopicsPath());
        this.getBrokers((List)this.metadataStore.getChildren(KopEventManager.getBrokersChangePath()).join(), null, "", -1L);
    }

    private void handleChildChangePathNotification(Notification notification) {
        if (notification.getPath().equals("/loadbalance/brokers")) {
            this.brokersChangeHandler.handleChildChange();
        } else if (notification.getPath().equals(KopEventManager.getDeleteTopicsPath())) {
            this.deletionTopicsHandler.handleChildChange();
        }
    }

    private void getBrokers(List<String> pulsarBrokers, BiConsumer<String, Long> registerEventLatency, String name, long startProcessTime) {
        ConcurrentMap kopBrokersMap = Maps.newConcurrentMap();
        AtomicInteger pendingBrokers = new AtomicInteger(pulsarBrokers.size());
        pulsarBrokers.forEach(broker -> this.metadataStore.get(KopEventManager.getBrokersChangePath() + "/" + broker).whenComplete((brokerData, e) -> {
            if (e != null) {
                if (registerEventLatency != null) {
                    registerEventLatency.accept(name, startProcessTime);
                }
                log.error("Get broker {} path data failed which have an error", broker, e);
                return;
            }
            if (brokerData.isPresent()) {
                JsonObject jsonObject = this.parseJsonObject(new String(((GetResult)brokerData.get()).getValue(), StandardCharsets.UTF_8));
                JsonObject protocols = jsonObject.getAsJsonObject("protocols");
                JsonElement element = protocols.get("kafka");
                if (element != null) {
                    String kopBrokerStrs = element.getAsString();
                    Map<String, Set<Node>> kopNodesMap = KopEventManager.getNodes(kopBrokerStrs);
                    kopNodesMap.forEach((listenerName, nodesSet) -> {
                        Set currentNodeSet = kopBrokersMap.computeIfAbsent(listenerName, s -> Sets.newConcurrentHashSet());
                        currentNodeSet.addAll(nodesSet);
                        kopBrokersMap.put(listenerName, currentNodeSet);
                    });
                } else if (log.isDebugEnabled()) {
                    log.debug("Get broker {} path currently not a kop broker, skip it.", broker);
                }
            } else if (log.isDebugEnabled()) {
                log.debug("Get broker {} path data empty.", broker);
            }
            if (pendingBrokers.decrementAndGet() == 0) {
                Map<String, Set<Node>> oldKopBrokers = this.adminManager.getAllBrokers();
                this.adminManager.setBrokers(kopBrokersMap);
                if (registerEventLatency != null) {
                    registerEventLatency.accept(name, startProcessTime);
                }
                log.info("Refresh kop brokers new cache {}, old brokers cache {}", this.adminManager.getAllBrokers(), oldKopBrokers);
            }
        }));
    }

    private JsonObject parseJsonObject(String info) {
        JsonParser parser = new JsonParser();
        return parser.parse(info).getAsJsonObject();
    }

    @VisibleForTesting
    public static Map<String, Set<Node>> getNodes(String kopBrokerStrs) {
        String[] kopBrokerArr;
        HashMap nodesMap = Maps.newHashMap();
        for (String kopBrokerStr : kopBrokerArr = kopBrokerStrs.split(",")) {
            AdvertisedListener advertisedListener = AdvertisedListener.create(kopBrokerStr);
            String listenerName = advertisedListener.getListenerName();
            String host = advertisedListener.getHostname();
            int port = advertisedListener.getPort();
            Set nodeSet = nodesMap.computeIfAbsent(listenerName, s -> new HashSet());
            nodeSet.add(new Node(Murmur3_32Hash.getInstance().makeHash((host + port).getBytes(StandardCharsets.UTF_8)), host, port));
            nodesMap.put(listenerName, nodeSet);
        }
        return nodesMap;
    }

    public KopEventWrapper getDeleteTopicEvent() {
        return new KopEventWrapper(new DeleteTopicsEvent());
    }

    public KopEventWrapper getBrokersChangeEvent() {
        return new KopEventWrapper(new BrokersChangeEvent());
    }

    public KopEventWrapper getShutdownEventThread() {
        return new KopEventWrapper(new ShutdownEventThread());
    }

    public static String getKopPath() {
        return "/kop";
    }

    public static String getDeleteTopicsPath() {
        return KopEventManager.getKopPath() + "/delete_topics";
    }

    public static String getBrokersChangePath() {
        return "/loadbalance/brokers";
    }

    class KopEventThread
    extends ShutdownableThread {
        public KopEventThread(String name) {
            super(name);
        }

        @Override
        protected void doWork() {
            KopEventWrapper eventWrapper = null;
            try {
                eventWrapper = queue.take();
                KopEventManager.this.registerEventQueuedLatency(eventWrapper);
                if (eventWrapper.kopEvent instanceof ShutdownEventThread) {
                    log.info("Shutting down KopEventThread.");
                } else {
                    eventWrapper.kopEvent.process(KopEventManager.this.registerEventLatency, MathUtils.nowInNano());
                }
            }
            catch (InterruptedException e) {
                log.error("Error processing event {}", (Object)eventWrapper, (Object)e);
            }
        }
    }

    static class KopEventWrapper {
        private final KopEvent kopEvent;
        private final long createdTime;

        public KopEventWrapper(KopEvent kopEvent) {
            this.kopEvent = kopEvent;
            this.createdTime = MathUtils.nowInNano();
        }

        public String toString() {
            return "KopEventWrapper(kopEvent=" + (String)(this.kopEvent == null ? "null" : "'" + this.kopEvent + "'") + ", createdTime='" + this.createdTime + "')";
        }

        public KopEvent getKopEvent() {
            return this.kopEvent;
        }

        public long getCreatedTime() {
            return this.createdTime;
        }
    }

    static interface KopEvent {
        public void process(BiConsumer<String, Long> var1, long var2);

        public String name();
    }

    class DeleteTopicsEvent
    implements KopEvent {
        DeleteTopicsEvent() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void process(BiConsumer<String, Long> registerEventLatency, long startProcessTime) {
            try {
                List topicsDeletions = (List)KopEventManager.this.metadataStore.getChildren(KopEventManager.getDeleteTopicsPath()).get();
                if (log.isDebugEnabled()) {
                    log.debug("Delete topics listener fired for topics {} to be deleted", (Object)topicsDeletions);
                }
                HashMap<String, GroupCoordinator> currentCoordinators = new HashMap<String, GroupCoordinator>(KopEventManager.this.groupCoordinatorsByTenant);
                Set deletedTopics = Sets.newConcurrentHashSet();
                AtomicInteger pendingCoordinators = new AtomicInteger(currentCoordinators.size());
                currentCoordinators.forEach((tenant, groupCoordinator) -> {
                    if (groupCoordinator.isActive()) {
                        HashSet topicsFullNameDeletionsSets = Sets.newHashSet();
                        HashSet kopTopicsSet = Sets.newHashSet();
                        String namespacePrefix = MetadataUtils.constructUserTopicsNamespace(tenant, KopEventManager.this.kafkaConfig);
                        topicsDeletions.forEach(topic -> {
                            KopTopic kopTopic = new KopTopic(TopicNameUtils.getTopicNameWithUrlDecoded(topic), namespacePrefix);
                            kopTopicsSet.add(kopTopic);
                            topicsFullNameDeletionsSets.add(kopTopic.getFullName());
                        });
                        Iterable<GroupMetadata> groupMetadataIterable = groupCoordinator.getGroupManager().currentGroups();
                        HashSet topicPartitionsToBeDeletions = Sets.newHashSet();
                        groupMetadataIterable.forEach(groupMetadata -> topicPartitionsToBeDeletions.addAll(groupMetadata.collectPartitionsWithTopics(topicsFullNameDeletionsSets)));
                        Set<Object> curDeletedTopics = Sets.newHashSet();
                        if (!topicPartitionsToBeDeletions.isEmpty()) {
                            groupCoordinator.handleDeletedPartitions(topicPartitionsToBeDeletions);
                            Set collectDeleteTopics = topicPartitionsToBeDeletions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
                            curDeletedTopics = kopTopicsSet.stream().filter(kopTopic -> collectDeleteTopics.contains(kopTopic.getFullName())).map(KopTopic::getOriginalName).collect(Collectors.toSet());
                        }
                        log.info("Tenant {} GroupMetadata delete topics {}, no matching topics {}", new Object[]{tenant, curDeletedTopics, Sets.difference((Set)topicsFullNameDeletionsSets, (Set)curDeletedTopics)});
                        deletedTopics.addAll(curDeletedTopics);
                    }
                    if (pendingCoordinators.decrementAndGet() == 0) {
                        deletedTopics.forEach(deletedTopic -> KopEventManager.this.metadataStore.delete(KopEventManager.getDeleteTopicsPath() + "/" + deletedTopic, Optional.of(-1L)));
                    }
                });
            }
            catch (InterruptedException | ExecutionException e) {
                log.error("DeleteTopicsEvent process have an error", (Throwable)e);
            }
            finally {
                registerEventLatency.accept(this.name(), startProcessTime);
            }
        }

        @Override
        public String name() {
            return "DeleteTopicsEvent";
        }
    }

    class BrokersChangeEvent
    implements KopEvent {
        BrokersChangeEvent() {
        }

        @Override
        public void process(BiConsumer<String, Long> registerEventLatency, long startProcessTime) {
            KopEventManager.this.metadataStore.getChildren(KopEventManager.getBrokersChangePath()).whenComplete((brokers, e) -> {
                if (e != null) {
                    log.error("BrokersChangeEvent process have an error", e);
                    return;
                }
                KopEventManager.this.getBrokers((List<String>)brokers, registerEventLatency, this.name(), startProcessTime);
            });
        }

        @Override
        public String name() {
            return "BrokersChangeEvent";
        }
    }

    static class ShutdownEventThread
    implements KopEvent {
        ShutdownEventThread() {
        }

        @Override
        public void process(BiConsumer<String, Long> registerEventLatency, long startProcessTime) {
            registerEventLatency.accept(this.name(), startProcessTime);
        }

        @Override
        public String name() {
            return "ShutdownEventThread";
        }
    }
}

