/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.config.server.service.notify;

import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest;
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.auth.util.AuthHeaderUtil;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.remote.ConfigClusterRpcClientProxy;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.config.server.service.notify.HttpClientManager;
import com.alibaba.nacos.config.server.service.notify.NotifyTask;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.InetUtils;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class AsyncNotifyService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);
    private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate();
    private static final int MIN_RETRY_INTERVAL = 500;
    private static final int INCREASE_STEPS = 1000;
    private static final int MAX_COUNT = 6;
    @Autowired
    private DumpService dumpService;
    @Autowired
    private ConfigClusterRpcClientProxy configClusterRpcClientProxy;
    private ServerMemberManager memberManager;

    @Autowired
    public AsyncNotifyService(final ServerMemberManager memberManager) {
        this.memberManager = memberManager;
        NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, (int)NotifyCenter.ringBufferSize);
        NotifyCenter.registerSubscriber((Subscriber)new Subscriber(){

            public void onEvent(Event event) {
                if (event instanceof ConfigDataChangeEvent) {
                    ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
                    long dumpTs = evt.lastModifiedTs;
                    String dataId = evt.dataId;
                    String group = evt.group;
                    String tenant = evt.tenant;
                    String tag = evt.tag;
                    MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);
                    Collection ipList = memberManager.allMembers();
                    LinkedList<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
                    LinkedList<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
                    for (Member member : ipList) {
                        if (!MemberUtil.isSupportedLongCon((Member)member)) {
                            httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta));
                            continue;
                        }
                        rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
                    }
                    if (!httpQueue.isEmpty()) {
                        ConfigExecutor.executeAsyncNotify(new AsyncTask(AsyncNotifyService.this.nacosAsyncRestTemplate, httpQueue));
                    }
                    if (!rpcQueue.isEmpty()) {
                        ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                    }
                }
            }

            public Class<? extends Event> subscribeType() {
                return ConfigDataChangeEvent.class;
            }
        });
    }

    private void asyncTaskExecute(NotifySingleTask task) {
        int delay = AsyncNotifyService.getDelayTime(task);
        LinkedList<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
        queue.add(task);
        AsyncTask asyncTask = new AsyncTask(this.nacosAsyncRestTemplate, queue);
        ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
    }

    private void asyncTaskExecute(NotifySingleRpcTask task) {
        int delay = AsyncNotifyService.getDelayTime(task);
        LinkedList<NotifySingleRpcTask> queue = new LinkedList<NotifySingleRpcTask>();
        queue.add(task);
        AsyncRpcTask asyncTask = new AsyncRpcTask(queue);
        ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
    }

    private static int getDelayTime(NotifyTask task) {
        int failCount = task.getFailCount();
        int delay = 500 + failCount * failCount * 1000;
        if (failCount <= 6) {
            task.setFailCount(failCount + 1);
        }
        return delay;
    }

    static class NotifySingleTask
    extends NotifyTask {
        private String target;
        private String url;
        private boolean isBeta;
        private static final String URL_PATTERN = "http://{0}{1}/v1/cs/communication/dataChange?dataId={2}&group={3}";
        private static final String URL_PATTERN_TENANT = "http://{0}{1}/v1/cs/communication/dataChange?dataId={2}&group={3}&tenant={4}";
        private int failCount;

        public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target) {
            this(dataId, group, tenant, lastModified, target, false);
        }

        public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target, boolean isBeta) {
            this(dataId, group, tenant, null, lastModified, target, isBeta);
        }

        public NotifySingleTask(String dataId, String group, String tenant, String tag, long lastModified, String target, boolean isBeta) {
            super(dataId, group, tenant, lastModified);
            this.target = target;
            this.isBeta = isBeta;
            try {
                dataId = URLEncoder.encode(dataId, "UTF-8");
                group = URLEncoder.encode(group, "UTF-8");
            }
            catch (UnsupportedEncodingException e) {
                LOGGER.error("URLEncoder encode error", (Throwable)e);
            }
            this.url = StringUtils.isBlank((CharSequence)tenant) ? MessageFormat.format(URL_PATTERN, target, EnvUtil.getContextPath(), dataId, group) : MessageFormat.format(URL_PATTERN_TENANT, target, EnvUtil.getContextPath(), dataId, group, tenant);
            if (StringUtils.isNotEmpty((String)tag)) {
                this.url = this.url + "&tag=" + tag;
            }
            this.failCount = 0;
        }

        @Override
        public void setFailCount(int count) {
            this.failCount = count;
        }

        @Override
        public int getFailCount() {
            return this.failCount;
        }

        public String getTargetIP() {
            return this.target;
        }
    }

    class AsyncRpcNotifyCallBack
    implements RequestCallBack<ConfigChangeClusterSyncResponse> {
        private NotifySingleRpcTask task;

        public AsyncRpcNotifyCallBack(NotifySingleRpcTask task) {
            this.task = task;
        }

        public Executor getExecutor() {
            return ConfigExecutor.getConfigSubServiceExecutor();
        }

        public long getTimeout() {
            return 1000L;
        }

        public void onResponse(ConfigChangeClusterSyncResponse response) {
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            if (response.isSuccess()) {
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), "ok", delayed, this.task.member.getAddress());
            } else {
                LOGGER.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), response.getErrorCode()});
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), "error", delayed, this.task.member.getAddress());
                AsyncNotifyService.this.asyncTaskExecute(this.task);
                LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
                MetricsMonitor.getConfigNotifyException().increment();
            }
        }

        public void onException(Throwable ex) {
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            LOGGER.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), ex});
            ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), "exception", delayed, this.task.member.getAddress());
            AsyncNotifyService.this.asyncTaskExecute(this.task);
            LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
            MetricsMonitor.getConfigNotifyException().increment();
        }
    }

    class AsyncNotifyCallBack
    implements Callback<String> {
        private NotifySingleTask task;

        public AsyncNotifyCallBack(NotifySingleTask task) {
            this.task = task;
        }

        public void onReceive(RestResult<String> result) {
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            if (result.ok()) {
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), "ok", delayed, this.task.target);
            } else {
                LOGGER.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), result.getCode()});
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), "error", delayed, this.task.target);
                AsyncNotifyService.this.asyncTaskExecute(this.task);
                LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
                MetricsMonitor.getConfigNotifyException().increment();
            }
        }

        public void onError(Throwable ex) {
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            LOGGER.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), ex});
            ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), "exception", delayed, this.task.target);
            AsyncNotifyService.this.asyncTaskExecute(this.task);
            LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
            MetricsMonitor.getConfigNotifyException().increment();
        }

        public void onCancel() {
            LogUtil.NOTIFY_LOG.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), "CANCELED"});
            AsyncNotifyService.this.asyncTaskExecute(this.task);
            LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
            MetricsMonitor.getConfigNotifyException().increment();
        }
    }

    static class NotifySingleRpcTask
    extends NotifyTask {
        private Member member;
        private boolean isBeta;
        private String tag;

        public NotifySingleRpcTask(String dataId, String group, String tenant, String tag, long lastModified, boolean isBeta, Member member) {
            super(dataId, group, tenant, lastModified);
            this.member = member;
            this.isBeta = isBeta;
            this.tag = tag;
        }
    }

    class AsyncRpcTask
    implements Runnable {
        private Queue<NotifySingleRpcTask> queue;

        public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (!this.queue.isEmpty()) {
                NotifySingleRpcTask task = this.queue.poll();
                ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
                syncRequest.setDataId(task.getDataId());
                syncRequest.setGroup(task.getGroup());
                syncRequest.setBeta(task.isBeta);
                syncRequest.setLastModified(task.getLastModified());
                syncRequest.setTag(task.tag);
                syncRequest.setTenant(task.getTenant());
                Member member = task.member;
                if (AsyncNotifyService.this.memberManager.getSelf().equals((Object)member)) {
                    if (syncRequest.isBeta()) {
                        AsyncNotifyService.this.dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getLastModified(), NetUtils.localIP(), true);
                        continue;
                    }
                    AsyncNotifyService.this.dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
                    continue;
                }
                if (!AsyncNotifyService.this.memberManager.hasMember(member.getAddress())) continue;
                boolean unHealthNeedDelay = AsyncNotifyService.this.memberManager.isUnHealth(member.getAddress());
                if (unHealthNeedDelay) {
                    ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), "unhealth", 0L, member.getAddress());
                    AsyncNotifyService.this.asyncTaskExecute(task);
                    continue;
                }
                if (!MemberUtil.isSupportedLongCon((Member)member)) {
                    AsyncNotifyService.this.asyncTaskExecute(new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag, task.getLastModified(), member.getAddress(), task.isBeta));
                    continue;
                }
                try {
                    AsyncNotifyService.this.configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
                }
                catch (Exception e) {
                    MetricsMonitor.getConfigNotifyException().increment();
                    AsyncNotifyService.this.asyncTaskExecute(task);
                }
            }
        }
    }

    class AsyncTask
    implements Runnable {
        private Queue<NotifySingleTask> queue;
        private NacosAsyncRestTemplate restTemplate;

        public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {
            this.restTemplate = restTemplate;
            this.queue = queue;
        }

        @Override
        public void run() {
            this.executeAsyncInvoke();
        }

        private void executeAsyncInvoke() {
            while (!this.queue.isEmpty()) {
                NotifySingleTask task = this.queue.poll();
                String targetIp = task.getTargetIP();
                if (!AsyncNotifyService.this.memberManager.hasMember(targetIp)) continue;
                boolean unHealthNeedDelay = AsyncNotifyService.this.memberManager.isUnHealth(targetIp);
                if (unHealthNeedDelay) {
                    ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), "unhealth", 0L, task.target);
                    AsyncNotifyService.this.asyncTaskExecute(task);
                    continue;
                }
                Header header = Header.newInstance();
                header.addParam("lastModified", String.valueOf(task.getLastModified()));
                header.addParam("opHandleIp", InetUtils.getSelfIP());
                if (task.isBeta) {
                    header.addParam("isBeta", "true");
                }
                AuthHeaderUtil.addIdentityToHeader((Header)header);
                this.restTemplate.get(task.url, header, Query.EMPTY, String.class, (Callback)new AsyncNotifyCallBack(task));
            }
        }
    }
}

