/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.brpc.naming.consul;

import com.baidu.brpc.client.channel.ServiceInstance;
import com.baidu.brpc.exceptions.RpcException;
import com.baidu.brpc.naming.BrpcURL;
import com.baidu.brpc.naming.FailbackNamingService;
import com.baidu.brpc.naming.NamingService;
import com.baidu.brpc.naming.NotifyListener;
import com.baidu.brpc.naming.RegisterInfo;
import com.baidu.brpc.naming.consul.client.ConsulClientExt;
import com.baidu.brpc.protocol.SubscribeInfo;
import com.baidu.brpc.utils.CustomThreadFactory;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;
import io.netty.util.internal.ConcurrentSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsulNamingService
extends FailbackNamingService
implements NamingService {
    private static final Logger log = LoggerFactory.getLogger(ConsulNamingService.class);
    private BrpcURL url;
    private ConsulClient client;
    private int consulInterval;
    private int lookupInterval;
    private ConcurrentMap<SubscribeInfo, WatchTask> watchTaskMap = new ConcurrentHashMap<SubscribeInfo, WatchTask>();
    private Set<String> instanceIds = new ConcurrentSet();
    private ScheduledExecutorService heartbeatExecutor;
    private ExecutorService watchExecutor;

    public ConsulNamingService(BrpcURL url) {
        super(url);
        this.url = url;
        try {
            log.info("construct consul with url: {}", (Object)url);
            String[] hostPorts = url.getHostPorts().split(":");
            String token = url.getQueryMap().get("token") != null ? url.getQueryMap().get("token").toString() : null;
            ConsulRawClient rawClient = new ConsulRawClient(hostPorts[0], Integer.parseInt(hostPorts[1]));
            this.client = new ConsulClientExt(rawClient, token);
        }
        catch (Exception e) {
            throw new RpcException(3, "wrong configuration of url, should be like test.bj:port", (Throwable)e);
        }
        this.consulInterval = url.getIntParameter("consulInterval", 30);
        this.lookupInterval = url.getIntParameter("lookupInterval", 20000);
        this.heartbeatExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new CustomThreadFactory("consul-heartbeat"));
        this.heartbeatExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                for (String instanceId : ConsulNamingService.this.instanceIds) {
                    ConsulNamingService.this.client.agentCheckPass(instanceId);
                    log.debug("Sending consul heartbeat for: {}", (Object)instanceId);
                }
            }
        }, 2000L, 2000L, TimeUnit.MILLISECONDS);
        this.watchExecutor = Executors.newFixedThreadPool(1, (ThreadFactory)new CustomThreadFactory("consul-watch"));
    }

    public void destroy() {
        super.destroy();
        this.heartbeatExecutor.shutdown();
        this.watchExecutor.shutdownNow();
        this.instanceIds.clear();
    }

    public List<ServiceInstance> lookup(SubscribeInfo subscribeInfo) {
        try {
            Response<List<HealthService>> consulServices = this.lookupHealthService(subscribeInfo.getServiceId(), -1L);
            List<ServiceInstance> instances = this.convert(consulServices);
            log.info("lookup {} instances from consul", (Object)instances.size());
            return instances;
        }
        catch (Exception ex) {
            log.warn("lookup endpoint list failed from {}, msg={}", (Object)this.url, (Object)ex.getMessage());
            if (!subscribeInfo.isIgnoreFailOfNamingService()) {
                throw new RpcException("lookup endpoint list failed from consul failed", (Throwable)ex);
            }
            return new ArrayList<ServiceInstance>();
        }
    }

    public void doSubscribe(SubscribeInfo subscribeInfo, NotifyListener listener) {
        String serviceName = subscribeInfo.getServiceId();
        Response<List<HealthService>> response = this.lookupHealthService(serviceName, -1L);
        List<ServiceInstance> instances = this.convert(response);
        log.info("lookup {} instances from consul", (Object)instances.size());
        WatchTask watchTask = new WatchTask(serviceName, instances, response.getConsulIndex(), listener);
        this.watchExecutor.submit(watchTask);
        this.watchTaskMap.putIfAbsent(subscribeInfo, watchTask);
    }

    public void doUnsubscribe(SubscribeInfo subscribeInfo) {
        WatchTask watchTask = (WatchTask)this.watchTaskMap.remove(subscribeInfo);
        if (watchTask != null) {
            watchTask.stop();
        }
        log.info("unsubscribe success from {}", (Object)this.url);
    }

    public void doRegister(RegisterInfo registerInfo) {
        NewService newService = this.getConsulNewService(registerInfo);
        this.client.agentServiceRegister(newService);
        this.instanceIds.add("service:" + newService.getId());
        log.info("register success to {}", (Object)this.url);
    }

    public void doUnregister(RegisterInfo registerInfo) {
        NewService newService = this.getConsulNewService(registerInfo);
        this.client.agentServiceDeregister(newService.getId());
        this.instanceIds.remove("service:" + newService.getId());
    }

    private NewService getConsulNewService(RegisterInfo registerInfo) {
        NewService newService = new NewService();
        newService.setName(registerInfo.getServiceId());
        newService.setId(this.generateInstanceId(registerInfo));
        newService.setAddress(registerInfo.getHost());
        newService.setPort(Integer.valueOf(registerInfo.getPort()));
        newService.setTags(Arrays.asList("brpc-java"));
        NewService.Check check = new NewService.Check();
        check.setTtl(this.consulInterval + "s");
        check.setDeregisterCriticalServiceAfter("3m");
        newService.setCheck(check);
        return newService;
    }

    public String generateInstanceId(RegisterInfo registerInfo) {
        StringBuilder sb = new StringBuilder();
        sb.append(registerInfo.getServiceId()).append(":").append(registerInfo.getHost()).append(":").append(registerInfo.getPort());
        return sb.toString();
    }

    public Response<List<HealthService>> lookupHealthService(String serviceName, long lastConsulIndex) {
        QueryParams queryParams = new QueryParams(600L, lastConsulIndex);
        HealthServicesRequest request = HealthServicesRequest.newBuilder().setTag("brpc-java").setQueryParams(queryParams).setPassing(true).build();
        return this.client.getHealthServices(serviceName, request);
    }

    public List<ServiceInstance> convert(Response<List<HealthService>> consulServices) {
        if (consulServices == null || consulServices.getValue() == null || ((List)consulServices.getValue()).isEmpty()) {
            return new ArrayList<ServiceInstance>();
        }
        ArrayList<ServiceInstance> serviceInstances = new ArrayList<ServiceInstance>();
        for (HealthService consulService : (List)consulServices.getValue()) {
            ServiceInstance serviceInstance = new ServiceInstance();
            serviceInstance.setIp(consulService.getService().getAddress());
            serviceInstance.setPort(consulService.getService().getPort().intValue());
            serviceInstance.setServiceName(consulService.getService().getService());
            serviceInstances.add(serviceInstance);
        }
        return serviceInstances;
    }

    private class WatchTask
    implements Runnable {
        private String serviceName;
        private List<ServiceInstance> lastInstances = new ArrayList<ServiceInstance>();
        private Long lastConsulIndex = -1L;
        private volatile boolean stopWatch = false;
        private NotifyListener listener;

        public WatchTask(String serviceName, List<ServiceInstance> lastInstances, Long lastConsulIndex, NotifyListener listener) {
            this.serviceName = serviceName;
            this.lastInstances = lastInstances;
            this.lastConsulIndex = lastConsulIndex;
            this.listener = listener;
        }

        @Override
        public void run() {
            while (!this.stopWatch) {
                try {
                    Response<List<HealthService>> response = ConsulNamingService.this.lookupHealthService(this.serviceName, this.lastConsulIndex);
                    Long currentIndex = response.getConsulIndex();
                    if (currentIndex == null || currentIndex <= this.lastConsulIndex) continue;
                    List<ServiceInstance> currentInstances = ConsulNamingService.this.convert(response);
                    Collection addList = CollectionUtils.subtract(currentInstances, this.lastInstances);
                    Collection deleteList = CollectionUtils.subtract(this.lastInstances, currentInstances);
                    this.listener.notify(addList, deleteList);
                    this.lastInstances = currentInstances;
                    this.lastConsulIndex = currentIndex;
                }
                catch (Exception e) {
                    log.error("consul change watch error", (Throwable)e);
                }
            }
        }

        public void stop() {
            this.stopWatch = true;
        }
    }
}

