/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.registry.jdbc.task;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscribeDataManager
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SubscribeDataManager.class);
    private final JdbcOperator jdbcOperator;
    private final JdbcRegistryProperties registryProperties;
    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<String, List<SubscribeListener>>();
    private final ScheduledExecutorService dataSubscribeCheckThreadPool;
    private final Map<String, JdbcRegistryData> jdbcRegistryDataMap = new ConcurrentHashMap<String, JdbcRegistryData>();

    public SubscribeDataManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
        this.registryProperties = registryProperties;
        this.jdbcOperator = jdbcOperator;
        this.dataSubscribeCheckThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("JdbcRegistrySubscribeDataCheckThread").setDaemon(true).build());
    }

    public void start() {
        this.dataSubscribeCheckThreadPool.scheduleWithFixedDelay(new RegistrySubscribeDataCheckTask(this.dataSubScribeMap, this.jdbcOperator, this.jdbcRegistryDataMap), this.registryProperties.getTermRefreshInterval().toMillis(), this.registryProperties.getTermRefreshInterval().toMillis(), TimeUnit.MILLISECONDS);
    }

    public void addListener(String path, SubscribeListener subscribeListener) {
        this.dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList()).add(subscribeListener);
    }

    public void removeListener(String path) {
        this.dataSubScribeMap.remove(path);
    }

    public String getData(String path) {
        JdbcRegistryData jdbcRegistryData = this.jdbcRegistryDataMap.get(path);
        if (jdbcRegistryData == null) {
            return null;
        }
        return jdbcRegistryData.getDataValue();
    }

    @Override
    public void close() {
        this.dataSubscribeCheckThreadPool.shutdownNow();
        this.dataSubScribeMap.clear();
    }

    static class RegistrySubscribeDataCheckTask
    implements Runnable {
        private final Map<String, List<SubscribeListener>> dataSubScribeMap;
        private final JdbcOperator jdbcOperator;
        private final Map<String, JdbcRegistryData> jdbcRegistryDataMap;

        @Override
        public void run() {
            try {
                Map currentJdbcDataMap = this.jdbcOperator.queryAllJdbcRegistryData().stream().collect(Collectors.toMap(JdbcRegistryData::getDataKey, Function.identity()));
                ArrayList<JdbcRegistryData> addedData = new ArrayList<JdbcRegistryData>();
                ArrayList<JdbcRegistryData> deletedData = new ArrayList<JdbcRegistryData>();
                ArrayList<JdbcRegistryData> updatedData = new ArrayList<JdbcRegistryData>();
                for (Map.Entry entry : currentJdbcDataMap.entrySet()) {
                    JdbcRegistryData newData = (JdbcRegistryData)entry.getValue();
                    JdbcRegistryData oldData = this.jdbcRegistryDataMap.get(entry.getKey());
                    if (oldData == null) {
                        addedData.add(newData);
                        continue;
                    }
                    if (((JdbcRegistryData)entry.getValue()).getLastUpdateTime().equals(oldData.getLastUpdateTime())) continue;
                    updatedData.add(newData);
                }
                for (Map.Entry<String, Object> entry : this.jdbcRegistryDataMap.entrySet()) {
                    if (currentJdbcDataMap.containsKey(entry.getKey())) continue;
                    deletedData.add((JdbcRegistryData)entry.getValue());
                }
                this.jdbcRegistryDataMap.clear();
                this.jdbcRegistryDataMap.putAll(currentJdbcDataMap);
                for (Map.Entry<String, Object> entry : this.dataSubScribeMap.entrySet()) {
                    String subscribeKey = entry.getKey();
                    List subscribeListeners = (List)entry.getValue();
                    this.triggerListener(addedData, subscribeKey, subscribeListeners, Event.Type.ADD);
                    this.triggerListener(deletedData, subscribeKey, subscribeListeners, Event.Type.REMOVE);
                    this.triggerListener(updatedData, subscribeKey, subscribeListeners, Event.Type.UPDATE);
                }
            }
            catch (Exception e) {
                log.error("Query data from jdbc registry error");
            }
        }

        private void triggerListener(List<JdbcRegistryData> dataList, String subscribeKey, List<SubscribeListener> subscribeListeners, Event.Type type) {
            for (JdbcRegistryData data : dataList) {
                if (!data.getDataKey().startsWith(subscribeKey)) continue;
                subscribeListeners.forEach(subscribeListener -> subscribeListener.notify(new Event(data.getDataKey(), data.getDataKey(), data.getDataValue(), type)));
            }
        }

        @Generated
        public RegistrySubscribeDataCheckTask(Map<String, List<SubscribeListener>> dataSubScribeMap, JdbcOperator jdbcOperator, Map<String, JdbcRegistryData> jdbcRegistryDataMap) {
            this.dataSubScribeMap = dataSubScribeMap;
            this.jdbcOperator = jdbcOperator;
            this.jdbcRegistryDataMap = jdbcRegistryDataMap;
        }
    }
}

