/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.registry.nameservice;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.nameservice.ServiceName;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.admin.MQAdminExt;

public class NameServiceRegistry
extends FailbackRegistry {
    private static final Logger logger = LoggerFactory.getLogger(NameServiceRegistry.class);
    private ScheduledExecutorService scheduledExecutorService;
    private Map<URL, RegistryInfoWrapper> consumerRegistryInfoWrapperMap = new ConcurrentHashMap<URL, RegistryInfoWrapper>();
    private MQAdminExt mqAdminExt;
    private boolean isNotRoute = true;
    private ClusterInfo clusterInfo;
    private TopicList topicList;
    private long timeoutMillis;
    private String instanceName;

    public NameServiceRegistry(URL url) {
        super(url);
        this.isNotRoute = url.getParameter("route", true);
        if (this.isNotRoute) {
            return;
        }
        this.timeoutMillis = url.getParameter("timeoutMillis", 3000);
        this.instanceName = url.getParameter("instanceName", "nameservic-registry");
        DefaultMQAdminExt clientConfig = new DefaultMQAdminExt();
        clientConfig.setNamesrvAddr(url.getAddress());
        clientConfig.setInstanceName(this.instanceName);
        this.mqAdminExt = new DefaultMQAdminExtImpl(clientConfig, this.timeoutMillis);
        try {
            this.mqAdminExt.start();
            this.initBeasInfo();
        }
        catch (Exception e) {
            String exeptionInfo = String.format("initBeasInfo pullRoute exception , cause %s ", e.getMessage());
            logger.error(exeptionInfo, (Throwable)e);
            throw new RuntimeException(exeptionInfo, e);
        }
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "dubbo-registry-nameservice");
            }
        });
        this.scheduledExecutorService.scheduleAtFixedRate(this::run, 10000L, 30000L, TimeUnit.MILLISECONDS);
    }

    private void run() {
        try {
            this.initBeasInfo();
            if (this.consumerRegistryInfoWrapperMap.isEmpty()) {
                return;
            }
            for (Map.Entry<URL, RegistryInfoWrapper> e : this.consumerRegistryInfoWrapperMap.entrySet()) {
                ArrayList<URL> urls = new ArrayList<URL>();
                this.pullRoute(e.getValue().serviceName, e.getKey(), urls);
                e.getValue().listener.notify(urls);
            }
        }
        catch (Exception e) {
            String exeptionInfo = String.format("ScheduledTask pullRoute exception , cause %s ", e.getMessage());
            logger.error(exeptionInfo, (Throwable)e);
        }
    }

    private void initBeasInfo() throws Exception {
        this.clusterInfo = this.mqAdminExt.examineBrokerClusterInfo();
        this.topicList = this.mqAdminExt.fetchAllTopicList();
    }

    private URL createProviderURL(ServiceName serviceName, URL url, int queue) {
        URLBuilder builder = URLBuilder.from((URL)url).setProtocol("rocketmq").setAddress(this.getUrl().getAddress());
        builder.addParameter("interface", serviceName.getServiceInterface());
        builder.addParameter("path", serviceName.getServiceInterface());
        builder.addParameter("bean.name", "ServiceBean:" + serviceName.getServiceInterface());
        builder.addParameter("side", "provider");
        builder.addParameter("category", "providers");
        builder.addParameter("protocol", "rocketmq");
        builder.addParameter("queueId", queue + "");
        builder.addParameter("topic", serviceName.getValue());
        return builder.build();
    }

    private ServiceName createServiceName(URL url) {
        return new ServiceName(url);
    }

    private void createTopic(ServiceName serviceName) {
        if (this.isNotRoute) {
            return;
        }
        if (this.topicList.getTopicList().contains(serviceName.getValue())) {
            return;
        }
        try {
            TopicConfig topicConfig = new TopicConfig(serviceName.getValue());
            topicConfig.setReadQueueNums(2);
            topicConfig.setWriteQueueNums(2);
            for (Map.Entry entry : this.clusterInfo.getBrokerAddrTable().entrySet()) {
                for (String brokerAddr : ((BrokerData)entry.getValue()).getBrokerAddrs().values()) {
                    this.mqAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig);
                }
            }
        }
        catch (Exception e) {
            String exceptionInfo = String.format("create topic fial, topic name is %s , cause %s", serviceName.getValue(), e.getMessage());
            logger.error(exceptionInfo, (Throwable)e);
            throw new RuntimeException(exceptionInfo, e);
        }
    }

    public boolean isAvailable() {
        return true;
    }

    public void doRegister(URL url) {
        ServiceName serviceName = this.createServiceName(url);
        this.createTopic(serviceName);
    }

    public void doUnregister(URL url) {
    }

    public void doSubscribe(URL url, NotifyListener listener) {
        if (Objects.equals(url.getCategory(), "configurators")) {
            return;
        }
        ServiceName serviceName = this.createServiceName(url);
        try {
            GroupList groupList = this.mqAdminExt.queryTopicConsumeByWho(serviceName.getValue());
            if (Objects.isNull(groupList) || groupList.getGroupList().isEmpty()) {
                return;
            }
        }
        catch (InterruptedException | MQBrokerException | MQClientException | RemotingException e) {
            String exceptionInfo = String.format("query topic consume fial, topic name is %s , url is %s , cause %s", serviceName.getValue(), url, e.getMessage());
            logger.error(exceptionInfo, e);
            throw new RuntimeException(exceptionInfo, e);
        }
        ArrayList<URL> urls = new ArrayList<URL>();
        if (this.isNotRoute) {
            URL providerURL = this.createProviderURL(serviceName, url, -1);
            urls.add(providerURL);
        } else {
            RegistryInfoWrapper registryInfoWrapper = new RegistryInfoWrapper();
            registryInfoWrapper.listener = listener;
            registryInfoWrapper.serviceName = serviceName;
            this.consumerRegistryInfoWrapperMap.put(url, registryInfoWrapper);
            this.pullRoute(serviceName, url, urls);
        }
        listener.notify(urls);
    }

    void pullRoute(ServiceName serviceName, URL url, List<URL> urls) {
        try {
            String topic = serviceName.getValue();
            TopicRouteData topicRouteData = this.mqAdminExt.examineTopicRouteInfo(topic);
            for (QueueData queueData : topicRouteData.getQueueDatas()) {
                if (!PermName.isReadable((int)queueData.getPerm())) continue;
                for (int i = 0; i < queueData.getReadQueueNums(); ++i) {
                    URL newUrl = this.createProviderURL(serviceName, url, i);
                    urls.add(newUrl.addParameter("brokerName", queueData.getBrokerName()));
                }
            }
        }
        catch (Exception e) {
            String exceptionInfo = String.format("query topic route fial, topic name is %s , url is %s , cause %s", serviceName.getValue(), url, e.getMessage());
            logger.error(exceptionInfo, (Throwable)e);
            throw new RuntimeException(exceptionInfo, e);
        }
    }

    public void doUnsubscribe(URL url, NotifyListener listener) {
        this.consumerRegistryInfoWrapperMap.remove(url);
    }

    private class RegistryInfoWrapper {
        private NotifyListener listener;
        private ServiceName serviceName;
    }
}

