亲宝软件园·资讯

展开

SpringCloud微服务剔除下线功能实现原理分析

卡布奇诺-海晨 人气:0

一、前言

上一篇SpringCloud微服务续约源码解析已经分析了心跳机制是什么、底层实现、客户端发送心跳的主要代码、注册中心处理心跳的过程,这节跟它是紧密关联的。联系的枢纽就是lastUpdateTimestamp最后更新时间戳,它是Lease租约类的一个用volatile关键字修饰的对其他线程透明可见的字段。那么Eureka是如何使用该字段判断服务是否过期的?然后进行服务的剔除下线?需要借助什么机制?该机制是什么时候能触发的?带着这些问题,我们下面来探究一番:

二、微服务剔除下线源码解析

EurekaBootStrap是Eureka项目里面的,用于启动Eureka服务器的类:

Eureka 服务器使用类路径中eureka.server.props指定的EurekaServerConfig进行配置。Eureka客户端组件也是通过使用eureka.client.props指定的配置 EurekaInstanceConfig初始化的。如果服务器在AWS云中运行,则eureka服务器将其绑定到指定的弹性ip。

1、EurekaBootStrap#contextInitialized()

    @Override
    public void contextInitialized(ServletContextEvent event) {
        try {
            initEurekaEnvironment();
            // 初始化注册中心上下文
            initEurekaServerContext();
            ServletContext sc = event.getServletContext();
            sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
        } catch (Throwable e) {
            logger.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

它这里也使用了事件机制,但是不是基于Spring的,感兴趣的可以去了解下。初始化注册中心上下文,即下面的处理逻辑:

1.1、初始化注册中心上下文

    protected void initEurekaServerContext() throws Exception {
        EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
        // For backward compatibility
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
        logger.info("Initializing the eureka client...");
        logger.info(eurekaServerConfig.getJsonCodecName());
        ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
        ApplicationInfoManager applicationInfoManager = null;
        if (eurekaClient == null) {
            EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
                    ? new CloudInstanceConfig()
                    : new MyDataCenterInstanceConfig();
            applicationInfoManager = new ApplicationInfoManager(
                    instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
            EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
            eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
        } else {
            applicationInfoManager = eurekaClient.getApplicationInfoManager();
        }
        PeerAwareInstanceRegistry registry;
        if (isAws(applicationInfoManager.getInfo())) {
            registry = new AwsInstanceRegistry(
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    eurekaClient
            );
            awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
            awsBinder.start();
        } else {
            registry = new PeerAwareInstanceRegistryImpl(
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    eurekaClient
            );
        }
        PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
                registry,
                eurekaServerConfig,
                eurekaClient.getEurekaClientConfig(),
                serverCodecs,
                applicationInfoManager
        );
        serverContext = new DefaultEurekaServerContext(
                eurekaServerConfig,
                serverCodecs,
                registry,
                peerEurekaNodes,
                applicationInfoManager
        );
        EurekaServerContextHolder.initialize(serverContext);
        serverContext.initialize();
        logger.info("Initialized server context");
        // Copy registry from neighboring eureka node
        int registryCount = registry.syncUp();
        registry.openForTraffic(applicationInfoManager, registryCount);
        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }

做一些初始化工作,重点关注registry.openForTraffic(applicationInfoManager, registryCount);的调用,进入下面处理逻辑:

1.2、openForTraffic()逻辑

    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        // 更新每30秒发生一次,一分钟应该是2倍。
        this.expectedNumberOfClientsSendingRenews = count;
        updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        // 更改服务实例状态为UP
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        // 调用父类初始化
        super.postInit();
    }

更改服务实例状态为UP,调用父类初始化。

1.3、postInit()执行任务

    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }

终于来到剔除任务了,前面说了什么,就是一些初始化的工作。它这里的执行器是Timer,跟Nacos不一样,区别的话感兴趣的就自行去搞个明白。我们进入下面的分析:

1.4、剔除任务

EvictionTask是TimerTask类型任务。

class EvictionTask extends TimerTask {
        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
        @Override
        public void run() {
            try {
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
        /**
         * 计算一个补偿时间,该时间定义为自上一次迭代以来该任务的实际执行时间,与配置的执行时间量相比较。
         * 这对于时间变化(例如由于时钟偏差或 gc)导致实际的驱逐任务根据配置的周期在所需时间之后执行的情况
         * 非常有用。
         */
        long getCompensationTimeMs() {
            long currNanos = getCurrentTimeNano();
            long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
            if (lastNanos == 0l) {
                return 0l;
            }
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0l ? 0l : compensationTime;
        }
        long getCurrentTimeNano() {  // for testing
            return System.nanoTime();
        }
    }

主要逻辑:

计算一个补偿时间,该时间定义为自上一次迭代以来该任务的实际执行时间,与配置的执行时间量相比较。这对于时间变化(例如由于时钟偏差或 gc)导致实际的驱逐任务根据配置的周期在所需时间之后执行的情况非常有用。

调用evict(compensationTimeMs)剔除处理,下面分析:

2、服务剔除下线

2.1、AbstractInstanceRegistry#evict()逻辑

public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
        if (!isLeaseExpirationEnabled()) {
            // DS: 租约到期目前已禁用。
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
        // 我们首先收集所有过期的物品,以随机的顺序驱逐它们。对于大型驱逐集,如果我们不这样做,
        // 我们可能会在自我保护启动之前删除整个应用程序。通过随机化,影响应该均匀地分布在所有应用程序中。
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    // 判断租约是否过期
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        // 添加到过期续租集合
                        expiredLeases.add(lease);
                    }
                }
            }
        }
        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        // 为了补偿 GC 暂停或漂移的本地时间,我们需要使用当前的注册表大小作为触发自我保存的基础。
        // 没有这个,我们就会清除整个注册表。
        // 获取注册表租约总数
        int registrySize = (int) getLocalRegistrySize();
        // 计算注册表租约的阈值 (总数乘以 续租百分比 默认85%),得出要续租的数量
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        // 理论要剔除的数量 = 总数-要续租的数量
        int evictionLimit = registrySize - registrySizeThreshold;
        // 实际剔除的数量 =  min(实际租期到期服务实例个数,理论剔除数量)
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        // 将要剔除数量大于0,把它们下线处理,从本地注册表移除掉以保证高可用
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // 选择一个随机的项目(Knuth 洗牌算法)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
 
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                // 注册表: {}/{}的租约已过期
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                // 服务下线
                internalCancel(appName, id, false);
            }
        }
    }

主要逻辑:

2.1、判断是否过期

    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

如果是cancel()处理前面的值就大于0,一般是判断后面部分逻辑:如果当前系统时间戳小于后面的时间戳之和,则没有过期;否则大于就是过期了。

duration的值也可以通过配置文件更改,通过yml配置文件中eureka:instance:lease-expiration-duration-in-seconds:指定,不过必须大于eureka:instance:lease-renewal-interval-in-seconds默认值或指定值。设置duration太长可能意味着即使实例不存在,流量也可能被路由到该实例。将此值设置得太小可能意味着,由于临时网络故障,该实例可能会从流量中删除。因此duration的值要设置为至少高于eureka:instance:lease-renewal-interval-in-seconds中默认的或指定的值。

2.2、从本地列表异常下线处理

cancel(String,String,boolean)方法被PeerAwareInstanceRegistry重写了,因此每个取消请求都被复制到对等点。然而,对于在远程对等点中被视为有效取消的过期,这是不需要的,因此自我保存模式不会启用。

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        // 加锁
        read.lock();
        try {
            CANCEL.increment(isReplication);
            // 根据appName从本地注册表获取租约服务实例
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                // 根据唯一ID从本地移除服务实例,下线
                leaseToCancel = gMap.remove(id);
            }
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) {
                // 下线失败,因为租约信息中不存在该服务实例
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                // 通过更新剔除时间取消租约。
                leaseToCancel.cancel();
                // 从租约获取服务实例
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                // 使特定应用程序的缓存失效
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            }
        } finally {
            // 释放锁
            read.unlock();
        }
        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to cancel it, reduce the number of clients to send renews.
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }
        return true;
    }

主要逻辑:

加载全部内容

相关教程
猜你喜欢
用户评论