亲宝软件园·资讯

展开

Nacos客户端配置中心缓存动态更新

mic 人气:0

Nacos 作为配置中心,当应用程序去访问Nacos动态获取配置源之后,会缓存到本地内存以及磁盘中。
由于Nacos作为动态配置中心,意味着后续配置变更之后需要让所有相关的客户端感知,并更新本地内存!

那么这个功能是在哪里实现的呢? 以及它是采用什么样的方式来实现配置的更新的呢? 我们一起来探索一下源码的实现!

客户端配置缓存更新

当客户端拿到配置后,需要动态刷新,从而保证数据和服务器端是一致的,这个过程是如何实现的呢?在这一小节中我们来做一个详细分析。

Nacos采用长轮训机制来实现数据变更的同步,原理如下!

整体工作流程如下:

长轮训任务启动入口

在NacosConfigService的构造方法中,当这个类被实例化以后,有做一些事情

public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = Constants.ENCODE;
    } else {
        this.encode = encodeTmp.trim();
    }
    initNamespace(properties); //
    this.configFilterChainManager = new ConfigFilterChainManager(properties);
    //初始化网络通信组件
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start(); 
    //初始化ClientWorker
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}

ClientWorker

在上述初始化代码中,我们重点需要关注ClientWorker这个类,它的构造方法如下

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager; //初始化配置过滤管理器
    // Initialize the timeout parameter
    init(properties); //初始化配置
    //初始化一个定时调度的线程池,重写了threadfactory方法
    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
     //初始化一个定时调度的线程池,从里面的name名字来看,似乎和长轮训有关系。而这个长轮训应该是和nacos服务端的长轮训
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    //设置定时任务的执行频率,并且调用checkConfigInfo这个方法,猜测是定时去检测配置是否发生了变化
        //首次执行延迟时间为1毫秒、延迟时间为10毫秒
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

可以看到 ClientWorker 除了将 HttpAgent 维持在自己内部,还创建了两个线程池:

第一个线程池是只拥有一个线程用来执行定时任务的 executor,executor 每隔 10ms 就会执行一次 checkConfigInfo() 方法,从方法名上可以知道是每 10 ms 检查一次配置信息。

第二个线程池是一个普通的线程池,从 ThreadFactory 的名称可以看到这个线程池是做长轮询的。

checkConfigInfo

ClientWorker构造初始化中,启动了一个定时任务去执行checkConfigInfo()方法,这个方法主要是定时检查本地配置和服务器上的配置的变更情况,这个方法定义如下.

public void checkConfigInfo() {
    // Dispatch tasks.
    int listenerSize = cacheMap.size(); //
    // Round up the longingTaskCount.
     // 向上取整为批数,监听的配置数量除以3000,得到一个整数,代表长轮训任务的数量
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
     //currentLongingTaskCount表示当前的长轮训任务数量,如果小于计算的结果,则可以继续创建
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

这个方法主要的目的是用来检查服务端的配置信息是否发生了变化。如果有变化,则触发listener通知

cacheMap: AtomicReference<Map<String, CacheData>> cacheMap 用来存储监听变更的缓存集合。key是根据dataID/group/tenant(租户) 拼接的值。Value是对应存储在nacos服务器上的配置文件的内容。

默认情况下,每个长轮训LongPullingRunnable任务默认处理3000个监听配置集。如果超过3000, 则需要启动多个LongPollingRunnable去执行。

currentLongingTaskCount保存已启动的LongPullingRunnable任务数

executorService就是在ClientWorker构造方法中初始化的线程池

LongPollingRunnable.run

LongPollingRunnable长轮训任务的实现逻辑,代码比较长,我们分段来分析。

第一部分主要有两个逻辑

class LongPollingRunnable implements Runnable {
    private final int taskId; //表示当前任务批次id
    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }
    @Override
    public void run() {
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // 遍历CacheMap,把CacheMap中和当前任务id相同的缓存,保存到cacheDatas
            // 通过checkLocalConfig方法
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) { //这里表示数据有变化,需要通知监听器
                            cacheData.checkListenerMd5(); //通知所有针对当前配置设置了监听的监听器
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }
           //省略部分
        } catch (Throwable e) {
            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); //出现异常,到下一次taskPenaltyTime后重新执行任务
        }
    }
}

checkLocalConfig

检查本地配置,这里面有三种情况

private void checkLocalConfig(CacheData cacheData) {
    final String dataId = cacheData.dataId;
    final String group = cacheData.group;
    final String tenant = cacheData.tenant;
    File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
    // 没有 -> 有
    if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);
        
        LOGGER.warn(
                "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        return;
    }
     // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
    // If use local config info, then it doesn't notify business listener and notify after getting from server.
    if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
        cacheData.setUseLocalConfigInfo(false);
        LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
                dataId, group, tenant);
        return;
    }
     // 有变更
    if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path
            .lastModified()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);
        LOGGER.warn(
                "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
    }
}

checkListenerMd5

遍历用户自己添加的监听器,如果发现数据的md5值不同,则发送通知

void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, wrap);
        }
    }
}

检查服务端配置

在LongPollingRunnable.run中,先通过本地配置的读取和检查来判断数据是否发生变化从而实现变化的通知

接着,当前的线程还需要去远程服务器上获得最新的数据,检查哪些数据发生了变化

// check server config
//从服务端获取发生变化的数据的DataID列表,保存在List<String>集合中
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
//遍历发生了变更的配置项
for (String groupKey : changedGroupKeys) {
    String[] key = GroupKey.parseKey(groupKey);
    String dataId = key[0];
    String group = key[1];
    String tenant = null;
    if (key.length == 3) {
        tenant = key[2];
    }
    try {
        //逐项根据这些配置项获取配置信息
        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
        //把配置信息保存到CacheData中
        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
        cache.setContent(response.getContent());
        cache.setEncryptedDataKey(response.getEncryptedDataKey());
        if (null != response.getConfigType()) {
            cache.setType(response.getConfigType());
        }
        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                    ContentUtils.truncateContent(response.getContent()), response.getConfigType());
    } catch (NacosException ioe) {
        String message = String
            .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
        LOGGER.error(message, ioe);
    }
}
//再遍历CacheData这个集合,找到发生变化的数据进行通知
for (CacheData cacheData : cacheDatas) {
    if (!cacheData.isInitializing() || inInitializingCacheList
        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
        cacheData.checkListenerMd5();
        cacheData.setInitializing(false);
    }
}
inInitializingCacheList.clear();
 //继续传递当前线程进行轮询
executorService.execute(this);

checkUpdateDataIds

这个方法主要是向服务器端发起检查请求,判断自己本地的配置和服务端的配置是否一致。

/**
 * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
 */
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) { //把需要检查的配置项,拼接成一个字符串
        if (!cacheData.isUseLocalConfigInfo()) { //找到isUseLocalConfigInfo=false的缓存
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {//
                // cacheData 首次出现在cacheMap中&首次check更新
                inInitializingCacheList
                    .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }
    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

checkUpdateConfigStr

从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    //拼接参数和header
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout", "" + timeout);
    // told server do not hang me up if new initializing cacheData added in
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {//判断可能发生变更的字符串是否为空,如果是,则直接返回。
        return Collections.emptyList();
    }
    try {
        // In order to prevent the server from handling the delay of the client's long task,
        // increase the client's read timeout to avoid this problem.
        // 设置readTimeoutMs,也就是本次请求等待响应的超时时间,默认是30s
        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        //发起远程调用
        HttpRestResult<String> result = agent
                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                        readTimeoutMs);
        if (result.ok()) { //如果响应成功
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.getData()); //解析并更新数据,返回的是确实发生了数据变更的字符串:tenant/group/dataid。
        } else {//如果响应失败
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
                    result.getCode());
        }
    } catch (Exception e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}

客户端缓存配置长轮训机制总结

整体实现的核心点就一下几个部分

对本地缓存的配置做任务拆分,每一个批次是3000条

针对每3000条创建一个线程去执行

先把每一个批次的缓存和本地磁盘文件中的数据进行比较,

先以tenent/groupId/dataId拼接成字符串,发送到服务端进行检查,返回发生了变更的配置

客户端收到变更配置列表,再逐项遍历发送到服务端获取配置内容。

服务端配置更新的推送

分析完客户端之后,随着好奇心的驱使,服务端是如何处理客户端的请求的?那么同样,我们需要思考几个问题

客户端发起的请求地址是:/v1/cs/configs/listener,于是找到这个接口进行查看,代码如下。

//# ConfigController.java
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
    Map<String, String> clientMd5Map;
    try {
        //解析客户端传递过来的可能发生变化的配置项目,转化为Map集合(key=dataId,value=md5)
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    // 开始执行长轮训。
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

doPollingConfig

这个方法主要是用来做长轮训和短轮询的判断

public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
        Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
    // 判断当前请求是否支持长轮训。()
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }
    //如果是短轮询,走下面的请求,下面的请求就是把客户端传过来的数据和服务端的数据逐项进行比较,保存到changeGroups中。
    // Compatible with short polling logic.
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);
    
    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);
    // Before 2.0.4 version, return value is put into header.
    if (versionNum < START_LONG_POLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }
    Loggers.AUTH.info("new content:" + newResult);
    // Disable cache.
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}

addLongPollingClient

把客户端的请求,保存到长轮训的执行引擎中。

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
        int probeRequestSize) {
    //获取客户端长轮训的超时时间
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); 
    //不允许断开的标记
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    //应用名称
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    //
    String tag = req.getHeader("Vipserver-Tag");
    //延期时间,默认为500ms
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);

    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
    // 提前500ms返回一个响应,避免客户端出现超时
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // Do nothing but set fix polling timeout.
    } else {
        long start = System.currentTimeMillis();
        //通过md5判断客户端请求过来的key是否有和服务器端有不一致的,如果有,则保存到changedGroups中。
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) { //如果发现有变更,则直接把请求返回给客户端
            generateResponse(req, rsp, changedGroups);
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { //如果noHangUpFlag为true,说明不需要挂起客户端,所以直接返回。
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        }
    }
    //获取请求端的ip
    String ip = RequestUtil.getRemoteIp(req);
    // Must be called by http thread, or send response.
    //把当前请求转化为一个异步请求(意味着此时tomcat线程被释放,也就是客户端的请求,需要通过asyncContext来手动触发返回,否则一直挂起)
    final AsyncContext asyncContext = req.startAsync();
    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L); //设置异步请求超时时间,
    //执行长轮训请求
    ConfigExecutor.executeLongPolling(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

ClientLongPolling

接下来我们来分析一下,clientLongPolling到底做了什么操作。或者说我们可以先猜测一下应该会做什么事情

基于这些猜想,我们可以看看它的实现过程

从代码粗粒度来看,它的实现似乎和我们的猜想一致,在run方法中,通过scheduler.schedule实现了一个定时任务,它的delay时间正好是前面计算的29.5s。在这个任务中,会通过MD5Util.compareMd5来进行计算

那另外一个,当数据发生变化以后,肯定不能等到29.5s之后才通知呀,那怎么办呢?我们发现有一个allSubs的东西,它似乎和发布订阅有关系。那是不是有可能当前的clientLongPolling订阅了数据变化的事件呢?

class ClientLongPolling implements Runnable {
    @Override
    public void run() {
        //构建一个异步任务,延后29.5s执行
        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
            @Override
            public void run() { //如果达到29.5s,说明这个期间没有做任何配置修改,则自动触发执行
                try {
                    getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                    // Delete subsciber's relations.
                    allSubs.remove(ClientLongPolling.this); //移除订阅关系
                    if (isFixedPolling()) { //如果是固定间隔的长轮训
                        LogUtil.CLIENT_LOG
                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        //比较变更的key
                        List<String> changedGroups = MD5Util
                                .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                        (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                        if (changedGroups.size() > 0) {//如果大于0,表示有变更,直接响应
                            sendResponse(changedGroups);
                        } else {
                            sendResponse(null); //否则返回null
                        }
                    } else {
                        LogUtil.CLIENT_LOG
                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        sendResponse(null);
                    }
                } catch (Throwable t) {
                    LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                }
            }
        }, timeoutTime, TimeUnit.MILLISECONDS);
        allSubs.add(this);  //把当前线程添加到订阅事件队列中
    }
}

allSubs

allSubs是一个队列,队列里面放了ClientLongPolling这个对象。这个队列似乎和配置变更有某种关联关系。

那么这里必须要实现的是,当用户在nacos 控制台修改了配置之后,必须要从这个订阅关系中取出关注的客户端长连接,然后把变更的结果返回。于是我们去看LongPollingService的构造方法查找订阅关系

/**
 * 长轮询订阅关系
 */
final Queue<ClientLongPolling> allSubs;
allSubs.add(this);

LongPollingService

在LongPollingService的构造方法中,使用了一个NotifyCenter订阅了一个事件,其中不难发现,如果这个事件的实例是LocalDataChangeEvent,也就是服务端数据发生变更的时间,就会执行一个DataChangeTask的线程。

public LongPollingService() {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
    //注册LocalDataChangeEvent订阅事件
    NotifyCenter.registerSubscriber(new Subscriber() {
        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) { //如果触发了LocalDataChangeEvent,则执行下面的代码
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }
        @Override
        public Class<? extends Event> subscribeType() {
            return LocalDataChangeEvent.class;
        }
    });
}

DataChangeTask

数据变更事件线程,代码如下

class DataChangeTask implements Runnable {
    @Override
    public void run() {
        try {
            ConfigCacheService.getContentBetaMd5(groupKey); //
            //遍历所有订阅事件表
            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                ClientLongPolling clientSub = iter.next(); //得到ClientLongPolling
                //判断当前的ClientLongPolling中,请求的key是否包含当前修改的groupKey
                if (clientSub.clientMd5Map.containsKey(groupKey)) {
                    // If published tag is not in the beta list, then it skipped.
                    if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { //如果是beta方式且betaIps不包含当前客户端ip,直接返回
                        continue;
                    }
                    // If published tag is not in the tag list, then it skipped.
                    if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {//如果配置了tag标签且不包含当前客户端的tag,直接返回
                        continue;
                    }
					//
                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                    iter.remove(); // Delete subscribers' relationships. 移除当前客户端的订阅关系
                    LogUtil.CLIENT_LOG
                            .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
                                    RequestUtil
                                            .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                    "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                    clientSub.sendResponse(Arrays.asList(groupKey)); //响应客户端请求。
                }
            }
        } catch (Throwable t) {
            LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
        }
    }
}

原理总结

加载全部内容

相关教程
猜你喜欢
用户评论