/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.registry;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.PreDestroy;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ServerNodeManager
implements InitializingBean {
    private final Logger logger = LoggerFactory.getLogger(ServerNodeManager.class);
    private final Lock masterLock = new ReentrantLock();
    private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = this.workerGroupLock.readLock();
    private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = this.workerGroupLock.writeLock();
    private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = this.workerNodeInfoLock.readLock();
    private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = this.workerNodeInfoLock.writeLock();
    private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap();
    private final Set<String> masterNodes = new HashSet<String>();
    private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap<String, WorkerHeartBeat>();
    private ScheduledExecutorService executorService;
    @Autowired
    private RegistryClient registryClient;
    private static final int WORKER_LISTENER_CHECK_LENGTH = 5;
    @Autowired
    private WorkerGroupMapper workerGroupMapper;
    private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
    @Autowired
    private AlertDao alertDao;
    @Autowired
    private MasterConfig masterConfig;
    private List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList<WorkerInfoChangeListener>();
    private static volatile int MASTER_SLOT = 0;
    private static volatile int MASTER_SIZE = 0;

    public static int getSlot() {
        return MASTER_SLOT;
    }

    public static int getMasterSize() {
        return MASTER_SIZE;
    }

    public void afterPropertiesSet() throws Exception {
        this.load();
        this.executorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("ServerNodeManagerExecutor"));
        this.executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0L, 10L, TimeUnit.SECONDS);
        this.registryClient.subscribe("/nodes/master", (SubscribeListener)new MasterDataListener());
        this.registryClient.subscribe("/nodes/worker", (SubscribeListener)new WorkerDataListener());
    }

    public void load() {
        this.updateMasterNodes();
        Collection workerGroups = this.registryClient.getWorkerGroupDirectly();
        for (String workerGroup : workerGroups) {
            this.syncWorkerGroupNodesFromRegistry(workerGroup, this.registryClient.getWorkerGroupNodesDirectly(workerGroup), Event.Type.ADD);
        }
    }

    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
        String[] addrs;
        HashSet<String> nodes = new HashSet<String>();
        for (String addr : addrs = wg.getAddrList().split(",")) {
            if (!newWorkerNodeInfo.containsKey(addr)) continue;
            nodes.add(addr);
        }
        return nodes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateMasterNodes() {
        MASTER_SLOT = 0;
        MASTER_SIZE = 0;
        this.masterNodes.clear();
        String nodeLock = "/lock/masters";
        try {
            this.registryClient.getLock(nodeLock);
            Collection currentNodes = this.registryClient.getMasterNodesDirectly();
            List masterNodes = this.registryClient.getServerList(NodeType.MASTER);
            this.syncMasterNodes(currentNodes, masterNodes);
        }
        catch (Exception e) {
            this.logger.error("update master nodes error", (Throwable)e);
        }
        finally {
            this.registryClient.releaseLock(nodeLock);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
        this.masterLock.lock();
        try {
            this.masterNodes.addAll(nodes);
            this.masterPriorityQueue.clear();
            this.masterPriorityQueue.putList(masterNodes);
            int index = this.masterPriorityQueue.getIndex(this.masterConfig.getMasterAddress());
            if (index >= 0) {
                MASTER_SIZE = nodes.size();
                MASTER_SLOT = index;
            } else {
                this.logger.warn("current addr:{} is not in active master list", (Object)this.masterConfig.getMasterAddress());
            }
            this.logger.info("update master nodes, master size: {}, slot: {}, addr: {}", new Object[]{MASTER_SIZE, MASTER_SLOT, this.masterConfig.getMasterAddress()});
        }
        finally {
            this.masterLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes, Event.Type type) {
        try {
            if (type == Event.Type.REMOVE) {
                if (!this.registryWorkerGroupNodes.containsKey(workerGroup)) {
                    this.logger.warn("cannot remove worker group {}, not in active list", (Object)workerGroup);
                    return;
                }
                this.registryWorkerGroupNodes.remove(workerGroup);
            } else {
                Set workerNodes = this.registryWorkerGroupNodes.getOrDefault(workerGroup, new HashSet());
                workerNodes.clear();
                workerNodes.addAll(nodes);
                this.registryWorkerGroupNodes.put(workerGroup, workerNodes);
            }
        }
        finally {
            this.refreshWorkerGroupNodes();
        }
    }

    private void refreshWorkerGroupNodes() {
        this.workerGroupWriteLock.lock();
        try {
            this.workerGroupNodes.clear();
            this.workerGroupNodes.putAll(this.registryWorkerGroupNodes);
            this.workerGroupNodes.putAll(this.dbWorkerGroupNodes);
            this.logger.debug("refresh worker group nodes, current list: {}", (Object)Arrays.toString(((ConcurrentHashMap.CollectionView)((Object)this.workerGroupNodes.keySet())).toArray()));
        }
        finally {
            this.notifyWorkerInfoChangeListeners();
            this.workerGroupWriteLock.unlock();
        }
    }

    public Map<String, Set<String>> getWorkerGroupNodes() {
        this.workerGroupReadLock.lock();
        try {
            Map<String, Set<String>> map = Collections.unmodifiableMap(this.workerGroupNodes);
            return map;
        }
        finally {
            this.workerGroupReadLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Set<String> getWorkerGroupNodes(String workerGroup) {
        this.workerGroupReadLock.lock();
        try {
            Set<String> nodes;
            if (StringUtils.isEmpty((CharSequence)workerGroup)) {
                workerGroup = "default";
            }
            if (CollectionUtils.isNotEmpty(nodes = this.workerGroupNodes.get(workerGroup))) {
                Set<String> set = Collections.unmodifiableSet(nodes);
                return set;
            }
            Set<String> set = nodes;
            return set;
        }
        finally {
            this.workerGroupReadLock.unlock();
        }
    }

    public Map<String, WorkerHeartBeat> getWorkerNodeInfo() {
        return Collections.unmodifiableMap(this.workerNodeInfo);
    }

    public WorkerHeartBeat getWorkerNodeInfo(String workerNode) {
        this.workerNodeInfoReadLock.lock();
        try {
            WorkerHeartBeat workerHeartBeat = this.workerNodeInfo.getOrDefault(workerNode, null);
            return workerHeartBeat;
        }
        finally {
            this.workerNodeInfoReadLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
        this.workerNodeInfoWriteLock.lock();
        try {
            this.workerNodeInfo.clear();
            for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
                this.workerNodeInfo.put(entry.getKey(), (WorkerHeartBeat)JSONUtils.parseObject((String)entry.getValue(), WorkerHeartBeat.class));
            }
        }
        finally {
            this.workerNodeInfoWriteLock.unlock();
        }
    }

    private void syncSingleWorkerNodeInfo(String node, WorkerHeartBeat info) {
        this.workerNodeInfoWriteLock.lock();
        try {
            this.workerNodeInfo.put(node, info);
        }
        finally {
            this.workerNodeInfoWriteLock.unlock();
        }
    }

    public synchronized void addWorkerInfoChangeListener(WorkerInfoChangeListener listener) {
        this.workerInfoChangeListeners.add(listener);
    }

    private void notifyWorkerInfoChangeListeners() {
        Map<String, Set<String>> workerGroupNodes = this.getWorkerGroupNodes();
        Map<String, WorkerHeartBeat> workerNodeInfo = this.getWorkerNodeInfo();
        for (WorkerInfoChangeListener listener : this.workerInfoChangeListeners) {
            listener.notify(workerGroupNodes, workerNodeInfo);
        }
    }

    @PreDestroy
    public void destroy() {
        this.executorService.shutdownNow();
    }

    class MasterDataListener
    implements SubscribeListener {
        MasterDataListener() {
        }

        public void notify(Event event) {
            String path = event.path();
            Event.Type type = event.type();
            if (ServerNodeManager.this.registryClient.isMasterPath(path)) {
                try {
                    if (type.equals((Object)Event.Type.ADD)) {
                        ServerNodeManager.this.logger.info("master node : {} added.", (Object)path);
                        ServerNodeManager.this.updateMasterNodes();
                    }
                    if (type.equals((Object)Event.Type.REMOVE)) {
                        ServerNodeManager.this.logger.info("master node : {} down.", (Object)path);
                        ServerNodeManager.this.updateMasterNodes();
                        ServerNodeManager.this.alertDao.sendServerStoppedAlert(1, path, "MASTER");
                    }
                }
                catch (Exception ex) {
                    ServerNodeManager.this.logger.error("MasterNodeListener capture data change and get data failed.", (Throwable)ex);
                }
            }
        }
    }

    class WorkerDataListener
    implements SubscribeListener {
        WorkerDataListener() {
        }

        public void notify(Event event) {
            String path = event.path();
            Event.Type type = event.type();
            String data = event.data();
            if (ServerNodeManager.this.registryClient.isWorkerPath(path)) {
                try {
                    String[] parts = path.split("/");
                    if (parts.length < 5) {
                        throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
                    }
                    String workerGroupName = parts[parts.length - 2];
                    String workerAddress = parts[parts.length - 1];
                    ServerNodeManager.this.logger.debug("received subscribe event : {}", (Object)event);
                    Collection currentNodes = ServerNodeManager.this.registryClient.getWorkerGroupNodesDirectly(workerGroupName);
                    ServerNodeManager.this.syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
                    if (type == Event.Type.ADD) {
                        ServerNodeManager.this.logger.info("worker group node : {} added, currentNodes : {}", (Object)path, (Object)currentNodes);
                    } else if (type == Event.Type.REMOVE) {
                        ServerNodeManager.this.logger.info("worker group node : {} down.", (Object)path);
                        ServerNodeManager.this.alertDao.sendServerStoppedAlert(1, path, "WORKER");
                    } else if (type == Event.Type.UPDATE) {
                        ServerNodeManager.this.syncSingleWorkerNodeInfo(workerAddress, (WorkerHeartBeat)JSONUtils.parseObject((String)data, WorkerHeartBeat.class));
                    }
                }
                catch (IllegalArgumentException ex) {
                    ServerNodeManager.this.logger.warn(ex.getMessage());
                }
                catch (Exception ex) {
                    ServerNodeManager.this.logger.error("WorkerGroupListener capture data change and get data failed", (Throwable)ex);
                }
            }
        }
    }

    class WorkerNodeInfoAndGroupDbSyncTask
    implements Runnable {
        WorkerNodeInfoAndGroupDbSyncTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                ServerNodeManager.this.dbWorkerGroupNodes.clear();
                Map registryWorkerNodeMap = ServerNodeManager.this.registryClient.getServerMaps(NodeType.WORKER, true);
                ServerNodeManager.this.syncAllWorkerNodeInfo(registryWorkerNodeMap);
                List workerGroupList = ServerNodeManager.this.workerGroupMapper.queryAllWorkerGroup();
                if (CollectionUtils.isNotEmpty((Collection)workerGroupList)) {
                    for (WorkerGroup wg : workerGroupList) {
                        String workerGroupName = wg.getName();
                        Set<String> workerAddress = ServerNodeManager.this.getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
                        if (workerAddress.isEmpty()) continue;
                        Set workerNodes = ServerNodeManager.this.dbWorkerGroupNodes.getOrDefault(workerGroupName, new HashSet());
                        workerNodes.clear();
                        workerNodes.addAll(workerAddress);
                        ServerNodeManager.this.dbWorkerGroupNodes.put(workerGroupName, workerNodes);
                    }
                }
            }
            catch (Exception e) {
                ServerNodeManager.this.logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", (Throwable)e);
            }
            finally {
                ServerNodeManager.this.refreshWorkerGroupNodes();
            }
        }
    }
}

