/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seata.discovery.registry.consul;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.config.exception.ConfigNotFoundException;
import org.apache.seata.discovery.registry.RegistryHeartBeats;
import org.apache.seata.discovery.registry.RegistryService;
import org.apache.seata.discovery.registry.consul.ConsulListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsulRegistryServiceImpl
implements RegistryService<ConsulListener> {
    private static volatile ConsulRegistryServiceImpl instance;
    private static volatile ConsulClient client;
    private static final Logger LOGGER;
    private static final Configuration FILE_CONFIG;
    private static final String FILE_ROOT_REGISTRY = "registry";
    private static final String FILE_CONFIG_SPLIT_CHAR = ".";
    private static final String REGISTRY_TYPE = "consul";
    private static final String SERVER_ADDR_KEY = "serverAddr";
    private static final String REGISTRY_CLUSTER = "cluster";
    private static final String DEFAULT_CLUSTER_NAME = "default";
    private static final String SERVICE_TAG = "services";
    private static final String ACL_TOKEN = "aclToken";
    private static final String FILE_CONFIG_KEY_PREFIX = "registry.consul.";
    private ConcurrentMap<String, List<InetSocketAddress>> clusterAddressMap = new ConcurrentHashMap<String, List<InetSocketAddress>>(8);
    private ConcurrentMap<String, Set<ConsulListener>> listenerMap = new ConcurrentHashMap<String, Set<ConsulListener>>(8);
    private ExecutorService notifierExecutor;
    private ConcurrentMap<String, ConsulNotifier> notifiers = new ConcurrentHashMap<String, ConsulNotifier>(8);
    private static final int THREAD_POOL_NUM = 1;
    private static final int MAP_INITIAL_CAPACITY = 8;
    private static final String DEFAULT_CHECK_INTERVAL = "10s";
    private static final String DEFAULT_CHECK_TIMEOUT = "1s";
    private static final String DEFAULT_DEREGISTER_TIME = "20s";
    private static final int DEFAULT_WATCH_TIMEOUT = 60;

    private ConsulRegistryServiceImpl() {
        this.notifierExecutor = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("services-consul-notifier", 1));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    static ConsulRegistryServiceImpl getInstance() {
        if (instance != null) return instance;
        Class<ConsulRegistryServiceImpl> clazz = ConsulRegistryServiceImpl.class;
        synchronized (ConsulRegistryServiceImpl.class) {
            if (instance != null) return instance;
            instance = new ConsulRegistryServiceImpl();
            // ** MonitorExit[var0] (shouldn't be in output)
            return instance;
        }
    }

    @Override
    public void register(InetSocketAddress address) throws Exception {
        NetUtil.validAddress(address);
        this.doRegister(address);
        RegistryHeartBeats.addHeartBeat(REGISTRY_TYPE, address, this::doRegister);
    }

    private void doRegister(InetSocketAddress address) {
        this.getConsulClient().agentServiceRegister(this.createService(address), ConsulRegistryServiceImpl.getAclToken());
    }

    @Override
    public void unregister(InetSocketAddress address) throws Exception {
        NetUtil.validAddress(address);
        this.getConsulClient().agentServiceDeregister(this.createServiceId(address), ConsulRegistryServiceImpl.getAclToken());
    }

    @Override
    public void subscribe(String cluster, ConsulListener listener) throws Exception {
        this.listenerMap.computeIfAbsent(cluster, key -> new HashSet()).add(listener);
        Response<List<HealthService>> response = this.getHealthyServices(cluster, -1L, 60L);
        Long index = response.getConsulIndex();
        ConsulNotifier notifier = this.notifiers.computeIfAbsent(cluster, key -> new ConsulNotifier(cluster, index));
        this.notifierExecutor.submit(notifier);
    }

    @Override
    public void unsubscribe(String cluster, ConsulListener listener) throws Exception {
        ConsulNotifier notifier = (ConsulNotifier)this.notifiers.remove(cluster);
        notifier.stop();
    }

    @Override
    public List<InetSocketAddress> lookup(String key) throws Exception {
        String cluster = this.getServiceGroup(key);
        if (cluster == null) {
            String missingDataId = "service.vgroupMapping." + key;
            throw new ConfigNotFoundException("%s configuration item is required", missingDataId);
        }
        return this.lookupByCluster(cluster);
    }

    private List<InetSocketAddress> lookupByCluster(String cluster) throws Exception {
        if (!this.listenerMap.containsKey(cluster)) {
            this.refreshCluster(cluster);
            this.subscribe(cluster, (List<HealthService> services) -> this.refreshCluster(cluster, services));
        }
        return (List)this.clusterAddressMap.get(cluster);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private ConsulClient getConsulClient() {
        if (client != null) return client;
        Class<ConsulRegistryServiceImpl> clazz = ConsulRegistryServiceImpl.class;
        synchronized (ConsulRegistryServiceImpl.class) {
            if (client != null) return client;
            String serverAddr = FILE_CONFIG.getConfig("registry.consul.serverAddr");
            InetSocketAddress inetSocketAddress = NetUtil.toInetSocketAddress(serverAddr);
            client = new ConsulClient(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return client;
        }
    }

    private String getClusterName() {
        String clusterConfigName = String.join((CharSequence)FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER);
        return FILE_CONFIG.getConfig(clusterConfigName, DEFAULT_CLUSTER_NAME);
    }

    private String createServiceId(InetSocketAddress address) {
        return this.getClusterName() + "-" + NetUtil.toStringAddress(address);
    }

    private static String getAclToken() {
        String fileConfigKey = String.join((CharSequence)FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, ACL_TOKEN);
        String aclToken = StringUtils.isNotBlank(System.getProperty(ACL_TOKEN)) ? System.getProperty(ACL_TOKEN) : FILE_CONFIG.getConfig(fileConfigKey);
        return StringUtils.isNotBlank(aclToken) ? aclToken : null;
    }

    private NewService createService(InetSocketAddress address) {
        NewService newService = new NewService();
        newService.setId(this.createServiceId(address));
        newService.setName(this.getClusterName());
        newService.setTags(Collections.singletonList(SERVICE_TAG));
        newService.setPort(Integer.valueOf(address.getPort()));
        newService.setAddress(NetUtil.toIpAddress(address));
        newService.setCheck(this.createCheck(address));
        return newService;
    }

    private NewService.Check createCheck(InetSocketAddress address) {
        NewService.Check check = new NewService.Check();
        check.setTcp(NetUtil.toStringAddress(address));
        check.setInterval(DEFAULT_CHECK_INTERVAL);
        check.setTimeout(DEFAULT_CHECK_TIMEOUT);
        check.setDeregisterCriticalServiceAfter(DEFAULT_DEREGISTER_TIME);
        return check;
    }

    private Response<List<HealthService>> getHealthyServices(String service, long index, long watchTimeout) {
        return this.getConsulClient().getHealthServices(service, HealthServicesRequest.newBuilder().setTag(SERVICE_TAG).setQueryParams(new QueryParams(watchTimeout, index)).setPassing(true).setToken(ConsulRegistryServiceImpl.getAclToken()).build());
    }

    private void refreshCluster(String cluster) {
        if (StringUtils.isBlank(cluster)) {
            return;
        }
        Response<List<HealthService>> response = this.getHealthyServices(cluster, -1L, -1L);
        if (response == null) {
            return;
        }
        this.refreshCluster(cluster, (List)response.getValue());
    }

    private void refreshCluster(String cluster, List<HealthService> services) {
        if (cluster == null || services == null) {
            return;
        }
        List<InetSocketAddress> addresses = services.stream().map(HealthService::getService).map(service -> new InetSocketAddress(service.getAddress(), (int)service.getPort())).collect(Collectors.toList());
        this.clusterAddressMap.put(cluster, addresses);
        this.removeOfflineAddressesIfNecessary(cluster, addresses);
    }

    @Override
    public void close() throws Exception {
        client = null;
    }

    static {
        LOGGER = LoggerFactory.getLogger(ConsulRegistryServiceImpl.class);
        FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
    }

    private class ConsulNotifier
    implements Runnable {
        private String cluster;
        private long consulIndex;
        private boolean running;
        private boolean hasError = false;

        ConsulNotifier(String cluster, long consulIndex) {
            this.cluster = cluster;
            this.consulIndex = consulIndex;
            this.running = true;
        }

        @Override
        public void run() {
            while (this.running) {
                try {
                    this.processService();
                }
                catch (Exception exception) {
                    this.hasError = true;
                    LOGGER.error("consul refresh services error:{}", (Object)exception.getMessage());
                }
            }
        }

        private void processService() {
            Response response = ConsulRegistryServiceImpl.this.getHealthyServices(this.cluster, this.consulIndex, 60L);
            Long currentIndex = response.getConsulIndex();
            if (currentIndex != null && currentIndex > this.consulIndex || this.hasError) {
                this.hasError = false;
                List services = (List)response.getValue();
                this.consulIndex = currentIndex;
                for (ConsulListener listener : (Set)ConsulRegistryServiceImpl.this.listenerMap.get(this.cluster)) {
                    listener.onEvent(services);
                }
            }
        }

        void stop() {
            this.running = false;
        }
    }
}

