大岭山镇做网站,网站开发与设计岗位职责,邯郸市递加网络有限公司,帮朋友免费做网站前言 说到服务信息#xff0c;我们还是得回到NamingService#xff0c;因为这是和NacosServer进行服务注册的核心组件#xff0c;内部提供了注册、获取Nacos实例的能力。至于其他组件#xff0c;如Ribbon#xff0c;在调用时需要所有实例信息来进行负载#xff0c;那肯定… 前言 说到服务信息我们还是得回到NamingService因为这是和NacosServer进行服务注册的核心组件内部提供了注册、获取Nacos实例的能力。至于其他组件如Ribbon在调用时需要所有实例信息来进行负载那肯定就是通过NamingService的能力来获取到所有的实例。
NamingService
在NamingService中获取实例主要有两类方法一类是getAllInstances、另一类是selectInstances它们最主要的区别就是selectInstances增加了对实例是否健康的过滤的支持。
既然如此那我们直接来就看看selectInstances的逻辑
Override
public ListInstance selectInstances(String serviceName, String groupName, ListString clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;if (subscribe) {serviceInfo hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ,));} else {serviceInfo hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ,));}return selectInstances(serviceInfo, healthy);
}上述代码有两个逻辑
1、传入的subscribe如果是true就代表是订阅模式就走hostReactor.getServiceInfo查询反之走hostReactor.getServiceInfoDirectlyFromServer查询。 2、根据实例的健康状态进行过滤返回。
那我们就先走进HostReactor对它提供的两个能力进行分析
HostReactor
这个类是在实例化NamingService时在构造函数中实例化的一个对象。
而在HostReactor实例化时其构造函数会创建一个定时线程池核心线程数量可以通过spring.cloud.nacos.discovery.namingPollingThreadCount进行控制默认是当前机器核心数的一半最少为1。
也会实例化一个FailoverReactor是一个容灾备份反应器其内部实例化了一个单线程的定时线程池内部由两个延迟定时任务组成 SwitchRefresher 每隔5秒去检查文件系统中是否有cacheDir /failover00-00---000-VIPSRV_FAILOVER_SWITCH-000---00-00文件如果有那么就标记为容灾模式。 cacheDir的取值如下 private void initCacheDir() {cacheDir System.getProperty(com.alibaba.nacos.naming.cache.dir);if (StringUtils.isEmpty(cacheDir)) {cacheDir System.getProperty(user.home) /nacos/naming/ namespace;}
}FailoverFileReader 如果是容灾模式就从文件中读取Service信息。 DiskFileWriter 初始延迟30分钟后每隔1天将serviceInfoMap(服务信息)写入到cacheDir /failover文件夹下。
整个FailoverReactor的示意图如下 在了解了HostReactor的基本情况后我们来对上面调用的两个方法进行分析。
getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug(failover-mode: failoverReactor.isFailoverSwitch());String key ServiceInfo.getKey(serviceName, clusters);// 如果是容灾模式就从failoverReactor中获取文件系统中缓存的ServiceInfo信息if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// 从HostReactor维护的serviceInfoMap中取ServiceInfoServiceInfo serviceObj getServiceInfo0(serviceName, clusters);// 如果HostReactor中没有if (null serviceObj) {serviceObj new ServiceInfo(serviceName, clusters);// 存入serviceInfoMapserviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());// 查询服务端的这个Service的信息如果有则使用和接收到服务端Service推送一致的更新处理updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {// 如果原来的serviceInfoMap有数据但updatingMap又存在说明可能存在了并发问题则需要锁住serviceObj一段时间等待执行完成if (UPDATE_HOLD_INTERVAL 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error([getServiceInfo] serviceName: serviceName , clusters: clusters, e);}}}}// 用一个定时线程池去执行Servcie的主动更新scheduleUpdateIfAbsent(serviceName, clusters);// 从本地serviceInfoMap中获取ServiceInforeturn serviceInfoMap.get(serviceObj.getKey());
}在现在就去更新服务信息的方法updateServiceNow中最终会调用的processServiceJson方法方法太长直接说逻辑
public ServiceInfo processServiceJson(String json) {....
}1、如果是以前就存在于本地serviceInfoMap中的数据就分别计算出其中新增修改删除的实例如果是修改的实例需要更新心跳信息。
2、更新本地serviceInfoMap。
3、使用EventDispatcher发布通知事件。
4、ServiceInfo写入本地磁盘。
在定时更新服务信息的方法scheduleUpdateIfAbsent中
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) ! null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) ! null) {return;}ScheduledFuture? future addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}
}如果futureMap中不存在这个Service的任务才会使用addTask进行添加。addTask中就是将UpdateTask延迟1s执行一次。具体后续调用是在UpdateTask中实现的。
Override
public void run() {long delayTime DEFAULT_DELAY;try {ServiceInfo serviceObj serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));if (serviceObj null) {updateService(serviceName, clusters);return;}if (serviceObj.getLastRefTime() lastRefTime) {updateService(serviceName, clusters);serviceObj serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));} else {// if serviceName already updated by push, we should not override it// since the push data may be different from pull through force pushrefreshOnly(serviceName, clusters);}lastRefTime serviceObj.getLastRefTime();if (!eventDispatcher.isSubscribed(serviceName, clusters) !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {// abort the update taskNAMING_LOGGER.info(update task is stopped, service: serviceName , clusters: clusters);return;}if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}delayTime serviceObj.getCacheMillis();resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn([NA] failed to update serviceName: serviceName, e);} finally {executor.schedule(this, Math.min(delayTime failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}
}上述代码简化一下就是以下几个点
1、如果本地serviceInfoMap中没有这个Service就去服务端查询并更新。
2、如果通过这样的拉模式下最后修改的时间是大于这个Service本身的修改时间的才进行更新。
3、下一次执行的时间是根据失败次数来定的比如第一次失败那就是delayTime左移一位失败几次就左移几次最多左移6次且最大延迟60s执行。
getServiceInfoDirectlyFromServer
public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters)throws NacosException {String result serverProxy.queryList(serviceName, clusters, 0, false);if (StringUtils.isNotEmpty(result)) {return JacksonUtils.toObj(result, ServiceInfo.class);}return null;
}这个方法就是直接请求服务端拿到ServiceInfo的数据。 思考 看到这里我们会不会有个疑问那就是HostReactor是通过定时执行去更新服务信息的那如果在时间间隔内有其他Servcie信息的更新呢那我们岂不是得等到下一次任务执行时才能得到更新后的信息
Nacos是考虑到了的通过用定时任务通过HTTP去拉数据和接收服务端通过UDP推送的数据一拉一推来保证数据的实时性。
HostReactor中的PushReceiver就是客户端侧对服务端侧推数据的处理器。
PushReceiver
HostReactor在实例化时其构造方法中也会实例化一个PushReceiver其内部是一个单线程的定时线程池死循环用来接收来自服务端的信息以及向服务端发送ACK确认信息。 核心代码如下
Override
public void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer new byte[UDP_MSS];DatagramPacket packet new DatagramPacket(buffer, buffer.length);udpSocket.receive(packet);String json new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info(received push data: json from packet.getAddress().toString());PushPacket pushPacket JacksonUtils.toObj(json, PushPacket.class);String ack;if (dom.equals(pushPacket.type) || service.equals(pushPacket.type)) {hostReactor.processServiceJson(pushPacket.data);// send ack to serverack {\type\: \push-ack\ , \lastRefTime\:\ pushPacket.lastRefTime \, \data\: \\};} else if (dump.equals(pushPacket.type)) {// dump data to serverack {\type\: \dump-ack\ , \lastRefTime\: \ pushPacket.lastRefTime \, \data\: \ StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap())) \};} else {// do nothing send ack onlyack {\type\: \unknown-ack\ , \lastRefTime\:\ pushPacket.lastRefTime \, \data\: \\};}udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,packet.getSocketAddress()));} catch (Exception e) {NAMING_LOGGER.error([NA] error while receiving push data, e);}}
}上述代码主要有以下几个逻辑
1、死循环接收udp的数据。
2、如果是dom或者service类型的消息会交由HostReactor进行处理。
3、向服务端发送ack确认信息。
总结
Nacos是通过定时任务使用HTTP拉数据和接收服务端通过UDP推送的数据来实现更新服务信息的目的。
今天的内容中还涉及到了Nacos的容灾处理可以通过在磁盘中配置达到开启本地容灾的模式。在获取实例时就会去本地磁盘中的备份文件中去找服务实例的数据。