茂名高端模板建站,西安关键词排名首页,营销型网站文案怎么做,数据分析师文章目录 1、客户端注册流程1.1、读取配置1.1.1、用于注册的 HttpClientRegisterRepository1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener 1.2、扫描注解#xff0c;注册元数据和URI1.2.1、构建URI并写入Disruptor1.2.2、构建元数据并写入Disruptor1.2.… 文章目录 1、客户端注册流程1.1、读取配置1.1.1、用于注册的 HttpClientRegisterRepository1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener 1.2、扫描注解注册元数据和URI1.2.1、构建URI并写入Disruptor1.2.2、构建元数据并写入Disruptor1.2.3、Disruptor消费数据并向shenyu-admin注册数据 2、服务端注册流程2.1、读取配置2.1.1、用于监听的ShenyuClientServerRegisterRepository 2.2、注册元数据和URI2.2.1、注册接口接收数据写入Disruptor2.2.2、Disruptor消费数据并持久化 1、客户端注册流程
当客户端启动后根据相关配置读取属性信息然后写入队列。以官方提供的 shenyu-examples-http 为例开始源码分析。
1.1、读取配置
该例子是一个springboot项目所以注册的入口往往在自动装配类中。不妨可以先看下项目的pom文件中引入了什么依赖
dependenciesdependencygroupIdorg.apache.shenyu/groupIdartifactIdshenyu-spring-boot-starter-client-springmvc/artifactIdversion${project.version}/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-webflux/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-logging/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency
/dependencies这里面看到就shenyu-spring-boot-starter-client-springmvc是跟ShenYu相关的所以入口应该就在这个依赖内了看下这个依赖的项目结构 发现就是两个配置类ShenyuSpringMvcClientInfoRegisterConfiguration由于使用了Configuration(proxyBeanMethods false)暂时不用关注重点关注ShenyuSpringMvcClientConfiguration它是shenyu客户端http注册配置类。
/*** shenyu 客户端http注册配置类*/
Configuration
// shenyu客户端通用配置类
ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
ConditionalOnProperty(value shenyu.register.enabled, matchIfMissing true, havingValue true)
public class ShenyuSpringMvcClientConfiguration {static {VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);}/**** 监听并处理http元数据和URI信息的注册** param clientConfig 客户端注册配置* param shenyuClientRegisterRepository 客户端注册类*/BeanConditionalOnMissingBean(ClientRegisterConfiguration.class)// 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);}
}通过Configuration表示这是一个配置类通过ImportAutoConfiguration引入ShenyuClientCommonBeanConfiguration配置类。
/*** shenyu客户端通用配置类创建注册中心客户端通用的bean*/
Configuration
ConditionalOnProperty(value shenyu.register.enabled, matchIfMissing true, havingValue true)
public class ShenyuClientCommonBeanConfiguration {/*** 根据注册中心配置通过SPI方式创建客户端注册类*/Beanpublic ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {return ShenyuClientRegisterRepositoryFactory.newInstance(config);}/*** Shenyu 客户端注册中心配置读取shenyu.register属性配置*/BeanConfigurationProperties(prefix shenyu.register)public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {return new ShenyuRegisterCenterConfig();}/*** Shenyu 客户端配置读取shenyu.client属性配置*/BeanConfigurationProperties(prefix shenyu)public ShenyuClientConfig shenyuClientConfig() {return new ShenyuClientConfig();}
}ShenyuClientCommonBeanConfiguration是ShenYu客户端的通用配置类创建了3个通用bean。
ShenyuClientRegisterRepository客户端注册类用于将客户端接口信息注册到注册中心。ShenyuRegisterCenterConfigShenYu客户端注册中心配置类读取shenyu.register属性配置。ShenyuClientConfigShenYu客户端配置类读取shenyu.client属性配置。
1.1.1、用于注册的 HttpClientRegisterRepository
上面生成的ShenyuClientRegisterRepository是用于实现客户端注册的接口会根据注册中心的配置通过SPI方式创建客户端注册类每一个注册方式都对应一个实现类。 目前支持7种注册类型
HttpHttpClientRegisterRepositoryApolloApolloClientRegisterRepositoryZookeeperZookeeperClientRegisterRepositoryEtcdEtcdClientRegisterRepositoryNacosNacosClientRegisterRepositoryConsulConsulClientRegisterRepositoryPolarisPolarisClientRegisterRepository
public final class ShenyuClientRegisterRepositoryFactory {private static final MapString, ShenyuClientRegisterRepository REPOSITORY_MAP new ConcurrentHashMap();/*** 根据注册中心类型实例化注册服务*/public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {// 通过SPI方式创建客户端注册类ShenyuClientRegisterRepository result ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());// 初始化对应客户端注册类比如创建zookeeper clientetcd clientadmin平台的token等result.init(shenyuRegisterCenterConfig);ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);return result;}return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());}
}加载类型通过registerType指定也就是我们在配置文件中指定的类型
shenyu:register:registerType: httpserverLists: http://localhost:9095props:username: adminpassword: 123qweQWE$$这里指定的是http所以这里创建的就是HttpClientRegisterRepository。props是用于连接注册中心的一些额外属性比如用户名密码命名空间等。 创建对应的注册客户端后会调用init方法根据shenyu.register下的配置进行初始化
Join
public class HttpClientRegisterRepository extends FailbackRegistryRepository {public HttpClientRegisterRepository() {}public HttpClientRegisterRepository(final ShenyuRegisterCenterConfig config) {init(config);}Overridepublic void init(final ShenyuRegisterCenterConfig config) {// shenyu-admin用户名this.username config.getProps().getProperty(Constants.USER_NAME);// shenyu-admin密码this.password config.getProps().getProperty(Constants.PASS_WORD);// shenyu-admin集群地址this.serverList Lists.newArrayList(Splitter.on(,).split(config.getServerLists()));// 根据上面3个信息请求shenyu-admin获取accessToken用于后面调用注册接口this.accessToken Caffeine.newBuilder()//see org.apache.shenyu.admin.config.properties.JwtProperties#expiredSeconds.expireAfterWrite(24L, TimeUnit.HOURS).build(new CacheLoaderString, String() {Overridepublic Nullable String load(NonNull final String server) throws Exception {try {// 调用shenyu-admin的登录接口/platform/login获取accessTokenOptional? login RegisterUtils.doLogin(username, password, server.concat(Constants.LOGIN_PATH));return login.map(String::valueOf).orElse(null);} catch (Exception e) {LOGGER.error(Login admin url :{} is fail, will retry. cause: {} , server, e.getMessage());return null;}}});}
}这里主要就是去调shenyu-admin的登录接口获取accessToken为后面的发送注册数据做准备。其他注册类型的ShenyuClientRegisterRepository也一样创建各自注册中心的client连接注册中心为发送数据做准备。类注解Join用于SPI的加载。
1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener
回到一开始的ShenyuSpringMvcClientConfiguration配置类
/*** shenyu 客户端http注册配置类*/
Configuration
// shenyu客户端通用配置类
ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
ConditionalOnProperty(value shenyu.register.enabled, matchIfMissing true, havingValue true)
public class ShenyuSpringMvcClientConfiguration {static {VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);}/**** 监听并处理http元数据和URI信息的注册** param clientConfig 客户端注册配置* param shenyuClientRegisterRepository 客户端注册类*/BeanConditionalOnMissingBean(ClientRegisterConfiguration.class)// 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);}
}创建了SpringMvcClientEventListener负责客户端 元数据 和 URI 数据的构建和注册。SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener而AbstractContextRefreshedEventListener是一个抽象类它实现了ApplicationListener接口并重写了onApplicationEvent()方法当有Spring事件发生后该方法会执行。每一种后端服务RPC调用协议都对应了一个监听类。
public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListenerObject, ShenyuSpringMvcClient {public SpringMvcClientEventListener(final PropertiesConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {super(clientConfig, shenyuClientRegisterRepository);// client配置Properties props clientConfig.getProps();// 是否是全部接口都注册this.isFull Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));// http协议this.protocol props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);this.addPrefixed Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.ADD_PREFIXED,Boolean.FALSE.toString()));mappingAnnotation.add(ShenyuSpringMvcClient.class);mappingAnnotation.add(RequestMapping.class);}// ...}SpringMvcClientEventListener的构造函数主要就是调用父类AbstractContextRefreshedEventListener的构造函数传入客户端配置和客户端注册类客户端配置指shenyu.client.http下的配置
shenyu:client:http:props:contextPath: /httpappName: http-appNameport: 8189isFull: falsepublic abstract class AbstractContextRefreshedEventListenerT, A extends Annotation implements ApplicationListenerContextRefreshedEvent {protected static final String PATH_SEPARATOR /;// Disruptor 发布器private final ShenyuClientRegisterEventPublisher publisher ShenyuClientRegisterEventPublisher.getInstance();// ...public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {// 读取 shenyu.client.http 配置信息Properties props clientConfig.getProps();this.appName props.getProperty(ShenyuClientConstants.APP_NAME);this.contextPath Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse();if (StringUtils.isBlank(appName) StringUtils.isBlank(contextPath)) {String errorMsg client register param must config the appName or contextPath;LOG.error(errorMsg);throw new ShenyuClientIllegalArgumentException(errorMsg);}this.ipAndPort props.getProperty(ShenyuClientConstants.IP_PORT);this.host props.getProperty(ShenyuClientConstants.HOST);this.port props.getProperty(ShenyuClientConstants.PORT);// 开始事件发布启动 Disruptorpublisher.start(shenyuClientRegisterRepository);} }取出相关配置信息后就启动 Disruptor 队列ShenyuClientRegisterEventPublisher可以看作是一个生产者用来向队列发送数据
public class ShenyuClientRegisterEventPublisher {private static final ShenyuClientRegisterEventPublisher INSTANCE new ShenyuClientRegisterEventPublisher();private DisruptorProviderManageDataTypeParent providerManage;public static ShenyuClientRegisterEventPublisher getInstance() {return INSTANCE;}/*** Start.** param shenyuClientRegisterRepository shenyuClientRegisterRepository*/public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {// 注册任务工厂类用于创建注册的任务客户端使用的是RegisterClientExecutorFactory // 而在服务端shenyu-admin用于处理注册任务的是RegisterServerConsumerExecutor// 都是用于消费Disruptor数据的任务RegisterClientExecutorFactory factory new RegisterClientExecutorFactory();// 添加元数据订阅器factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));// 添加URI订阅器factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));// 添加ApiDoc订阅器factory.addSubscribers(new ShenyuClientApiDocExecutorSubscriber(shenyuClientRegisterRepository));providerManage new DisruptorProviderManage(factory);// 启动Disruptor队列并创建消费者providerManage.startup();}/*** 发布事件向Disruptor队列发数据** param data the data*/public void publishEvent(final DataTypeParent data) {DisruptorProviderDataTypeParent provider providerManage.getProvider();provider.onData(data);}
}start方法主要是为队列添加订阅器会由消费者接收到信息后调用这些订阅器。然后启动启动Disruptor队列并创建消费者。
public class DisruptorProviderManageT {public void startup() {this.startup(false);}public void startup(final boolean isOrderly) {OrderlyExecutor executor new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(),DisruptorThreadFactory.create(shenyu_disruptor_consumer_, false), new ThreadPoolExecutor.AbortPolicy());int newConsumerSize this.consumerSize;EventFactoryDataEventT eventFactory;if (isOrderly) {newConsumerSize 1;eventFactory new OrderlyDisruptorEventFactory();} else {eventFactory new DisruptorEventFactory();}DisruptorDataEventT disruptor new Disruptor(eventFactory,size,DisruptorThreadFactory.create(shenyu_disruptor_provider_ consumerFactory.fixName(), false),ProducerType.MULTI,new BlockingWaitStrategy());// 创建消费者SuppressWarnings(all)QueueConsumerT[] consumers new QueueConsumer[newConsumerSize];for (int i 0; i newConsumerSize; i) {consumers[i] new QueueConsumer(executor, consumerFactory);}// 设置消费者disruptor.handleEventsWithWorkerPool(consumers);disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());// 真正调用disruptor的api启动disruptor.start();RingBufferDataEventT ringBuffer disruptor.getRingBuffer();// disruptor的生产者provider new DisruptorProvider(ringBuffer, disruptor, isOrderly);} }这里就是准备Disruptor队列的一些逻辑就不细讲了其中QueueConsumer是Disruptor的消费者后面就是由它接收数据。
1.2、扫描注解注册元数据和URI
上面说到SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener而AbstractContextRefreshedEventListener实现了ApplicationListener接口并重写了onApplicationEvent()方法当有Spring事件发生后该方法会执行。
// 当有上下文刷新事件ContextRefreshedEvent发生时该方法会执行算是客户端的执行入口吧
Override
public void onApplicationEvent(NonNull final ContextRefreshedEvent event) {context event.getApplicationContext();// 获取客户端的接口类比如http就是Controller类dubbo就是DubboService类由子类实现MapString, T beans getBeans(context);if (MapUtils.isEmpty(beans)) {return;}// 保证只注册一次if (!registered.compareAndSet(false, true)) {return;}// 构建URI并写入Disruptor由子类实现publisher.publishEvent(buildURIRegisterDTO(context, beans));// 构建元数据并写入Disruptorbeans.forEach(this::handle);MapString, Object apiModules context.getBeansWithAnnotation(ApiModule.class);apiModules.forEach((k, v) - handleApiDoc(v, beans));
}获取客户端服务的接口类由具体的子类实现http就是Controller类这里对应的子类就是SpringMvcClientEventListener
Override
protected MapString, Object getBeans(final ApplicationContext context) {// Filter outif (Boolean.TRUE.equals(isFull)) {// isFulltrue表示代理整个服务就不需要注解扫描了// 直接构建元数据和URI写入DisruptorgetPublisher().publishEvent(MetaDataRegisterDTO.builder().contextPath(getContextPath()).addPrefixed(addPrefixed).appName(getAppName()).path(PathUtils.decoratorPathWithSlash(getContextPath())).rpcType(RpcTypeEnum.HTTP.getName()).enabled(true).ruleName(getContextPath()).build());LOG.info(init spring mvc client success with isFull mode);// 构建URIpublisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));return Collections.emptyMap();}// 否则获取Controller注解的beanreturn context.getBeansWithAnnotation(Controller.class);
}这里会判断配置文件中的shenyu.client.http.props.isFull如果是true则直接构建一个元数据和URI写入到Disruptor中然后返回一个空集合后续的逻辑就没执行了。如果是false则从spring容器中获取带Controller注解的bean返回。
1.2.1、构建URI并写入Disruptor
构建一个URI数据写入到Disruptor这个也是由子类实现的
// 构建URI
Override
protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,final MapString, Object beans) {try {return URIRegisterDTO.builder().contextPath(getContextPath()) // shneyu得contextPath.appName(getAppName()) // appName.protocol(protocol) // 服务协议.host(super.getHost()) // 服务host.port(Integer.valueOf(getPort())) // 服务端口.rpcType(RpcTypeEnum.HTTP.getName()) // rpc类型.eventType(EventType.REGISTER) // 事件类型.build();} catch (ShenyuException e) {throw new ShenyuException(e.getMessage() please config ${shenyu.client.http.props.port} in xml/yml !);}
}可以看出来URI跟接口类没有关系一个后端服务实例生成一个URI。
1.2.2、构建元数据并写入Disruptor
之后遍历每个接口构建元数据beans.forEach(this::handle)
/*** 构建元数据并写入Disruptor*/
protected void handle(final String beanName, final T bean) {Class? clazz getCorrectedClass(bean);// 获取当前bean的对应shenyu客户端的注解比如http是ShenyuSpringMvcClient // dubbo是ShenyuDubboClientfinal A beanShenyuClient AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());// 获取bean对应的path类上注解的路径由子类实现final String superPath buildApiSuperPath(clazz, beanShenyuClient);// 如果有shenyu客户端注解并且path中包含*则表示要注册整个类的方法只需要构建一个类元数据if (Objects.nonNull(beanShenyuClient) superPath.contains(*)) {// 由具体的子类构建类元数据写入DisruptorhandleClass(clazz, bean, beanShenyuClient, superPath);return;}// 类上没有shenyu客户端注解类上没有注解但方法上有注解也是可以注册的// 或者有注解但是path没有包含*则就要遍历每个方法为每个需要注册的方法构建方法元数据final Method[] methods ReflectionUtils.getUniqueDeclaredMethods(clazz);for (Method method : methods) {// 由具体子类构建方法元数据写入Disruptor并将每个method对应的元数据对象缓存在当前类里handleMethod(bean, clazz, beanShenyuClient, method, superPath);}
}protected void handleClass(final Class? clazz,final T bean,NonNull final A beanShenyuClient,final String superPath) {publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));
}protected void handleMethod(final T bean,final Class? clazz,Nullable final A beanShenyuClient,final Method method,final String superPath) {// 如果方法上有Shenyu客户端注解就表示该方法需要注册A methodShenyuClient AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());if (Objects.nonNull(methodShenyuClient)) {final MetaDataRegisterDTO metaData buildMetaDataDTO(bean, methodShenyuClient,buildApiPath(method, superPath, methodShenyuClient), clazz, method);publisher.publishEvent(metaData);metaDataMap.put(method, metaData);}
}// 获取接口对应路径如果shenyu注解上没有就用RequestMapping上的路径
// 但是这个只支持第一个路径
Override
protected String buildApiSuperPath(final Class? clazz, Nullable final ShenyuSpringMvcClient beanShenyuClient) {if (Objects.nonNull(beanShenyuClient) StringUtils.isNotBlank(beanShenyuClient.path())) {return beanShenyuClient.path();}RequestMapping requestMapping AnnotationUtils.findAnnotation(clazz, RequestMapping.class);// Only the first path is supported temporarilyif (Objects.nonNull(requestMapping) ArrayUtils.isNotEmpty(requestMapping.path()) StringUtils.isNotBlank(requestMapping.path()[0])) {return requestMapping.path()[0];}return ;
}// springmvc接口上需要有 ShenyuSpringMvcClient 注解
// 并且包含RequestMapping注解表示是一个接口才进行注册
protected void handleMethod(final Object bean, final Class? clazz,Nullable final ShenyuSpringMvcClient beanShenyuClient,final Method method, final String superPath) {final RequestMapping requestMapping AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);ShenyuSpringMvcClient methodShenyuClient AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);methodShenyuClient Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;// 如果有 ShenyuSpringMvcClient 注解并且包含RequestMapping注解表示是一个接口则进行注册if (Objects.nonNull(methodShenyuClient) Objects.nonNull(requestMapping)) {// 构建元数据final MetaDataRegisterDTO metaData buildMetaDataDTO(bean, methodShenyuClient,// 构建path contextPath 类上的路径 方法上的路径 buildApiPath(method, superPath, methodShenyuClient), clazz, method);// 发布元数据getPublisher().publishEvent(metaData);getMetaDataMap().put(method, metaData);}
}// path contextPath 类上的路径 方法上的路径
// 如果ShenyuSpringMvcClient注解上的路径不为空则方法上的路径ShenyuSpringMvcClient上的value
// 否则方法上的路径RequestMapping上的value
Override
protected String buildApiPath(final Method method, final String superPath,NonNull final ShenyuSpringMvcClient methodShenyuClient) {String contextPath getContextPath();if (StringUtils.isNotBlank(methodShenyuClient.path())) {return pathJoin(contextPath, superPath, methodShenyuClient.path());}final String path getPathByMethod(method);if (StringUtils.isNotBlank(path)) {return pathJoin(contextPath, superPath, path);}return pathJoin(contextPath, superPath);
}1.2.3、Disruptor消费数据并向shenyu-admin注册数据
上面启动Disruptor的时候说到QueueConsumer实现了WorkHandler接口是Disruptor的消费者消费逻辑就在它的onEvent方法中
public class QueueConsumerT implements WorkHandlerDataEventT {private final OrderlyExecutor executor;private final QueueConsumerFactoryT factory;/*** Instantiates a new Queue consumer.** param executor the executor* param factory the factory*/public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactoryT factory) {this.executor executor;this.factory factory;}Overridepublic void onEvent(final DataEventT t) {if (Objects.nonNull(t)) {// 根据事件类型使用不同的线程池ThreadPoolExecutor executor orderly(t);// 通过工厂创建队列消费任务 RegisterClientConsumerExecutorQueueConsumerExecutorT queueConsumerExecutor factory.create();// 为消费任务设置数据queueConsumerExecutor.setData(t.getData());t.setData(null);// 放在线程池中执行 消费任务executor.execute(queueConsumerExecutor);}}// ...
}QueueConsumerExecutor是实现了Runnable的消费任务它有两个实现
RegisterClientConsumerExecutor客户端消费者任务RegisterServerConsumerExecutor服务端消费者任务
从名字也可以看出RegisterClientConsumerExecutor负责处理客户端任务shenyu客户端将元数据和URI写入disruptor后由这个消费者任务来消费数据执行实际向注册中心注册的操作。RegisterServerConsumerExecutor负责处理服务端shenyu-admin任务服务端从注册中心监听到元数据和URI后写入disruptor然后由RegisterServerConsumerExecutor任务来消费数据处理数据入库操作和发布事件。 RegisterClientConsumerExecutor的消费逻辑
public final class RegisterClientConsumerExecutorT extends DataTypeParent extends QueueConsumerExecutorT {private final MapDataType, ExecutorTypeSubscriberT subscribers;private RegisterClientConsumerExecutor(final MapDataType, ExecutorTypeSubscriberT executorSubscriberMap) {this.subscribers new EnumMap(executorSubscriberMap);}Overridepublic void run() {// 获取数据final T data getData();// 根据数据类型获取对应的处理器进行处理即在disruptor启动的时候添加的订阅器subscribers.get(data.getType()).executor(Lists.newArrayList(data));}// ...
}根据不同的数据类型使用不同的订阅器执行器去执行这些订阅器是在disruptor启动的时候设置的。目前注册的数据类型有3种元数据URI和API文档。
public enum DataType {/*** Meta data data type enum.*/META_DATA,/*** Uri data type enum.*/URI,/*** Api doc type enum.*/API_DOC,
}所以相对应的订阅器也分为3类分别处理元数据URI和API文档。在客户端和服务端分别有两个所以一共是6个。 元数据处理
public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriberMetaDataRegisterDTO {private final ShenyuClientRegisterRepository shenyuClientRegisterRepository;// .../*** 遍历元数据对数据注册到注册中心*/Overridepublic void executor(final CollectionMetaDataRegisterDTO metaDataRegisterDTOList) {for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {// 调用响应注册中心的客户端注册类注册元数据shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);}}
}遍历数据然后又将数据委托给ShenyuClientRegisterRepository执行。ShenyuClientRegisterRepository是在一开始读取配置的时候就创建了是客户端注册类用来将数据发送到注册中心的类不同的注册方式有不同的实现类该示例使用http方式注册shenyu.register.registerTypehttp的实现类是HttpClientRegisterRepository。HttpClientRegisterRepository并没有直接实现ShenyuClientRegisterRepository接口而是继承FailbackRegistryRepositoryFailbackRegistryRepository实现了ShenyuClientRegisterRepository接口FailbackRegistryRepository本身主要用于对http注册过程中的失败重试。
Override
public void persistInterface(final MetaDataRegisterDTO metadata) {try {this.doPersistInterface(metadata);} catch (Exception ex) {//If a failure occurs, it needs to be added to the retry list.// 如果注册失败则添加到重试列表过一段时间后重新注册logger.warn(Failed to persistInterface {}, cause:{}, metadata, ex.getMessage());this.addFailureMetaDataRegister(metadata);}
}Override
public void doPersistInterface(final MetaDataRegisterDTO metadata) {// META_PATH /shenyu-client/register-metadatadoRegister(metadata, Constants.META_PATH, Constants.META_TYPE);
}// 发送注册数据到admin
private T void doRegister(final T t, final String path, final String type) {int i 0;// admin集群中的每个都要去注册admin之间没有同步机制么for (String server : serverList) {i;// 拼接上admin的地址和具体的接口路径构成完整的请求地址String concat server.concat(path);try {// 调用admin接口的tokenString accessToken this.accessToken.get(server);if (StringUtils.isBlank(accessToken)) {throw new NullPointerException(accessToken is null);}// 通过工具类发送http请求RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);// considering the situation of multiple clusters, we should continue to execute here} catch (Exception e) {LOGGER.error(Register admin url :{} is fail, will retry. cause:{}, server, e.getMessage());if (i serverList.size()) {throw new RuntimeException(e);}}}
}http注册方式比较简单遍历每个admin服务获取到accessToken后向/shenyu-client/register-metadata接口地址发起http请求将数据发送给admin。 URI处理
public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriberURIRegisterDTO {Overridepublic void executor(final CollectionURIRegisterDTO dataList) {for (URIRegisterDTO uriRegisterDTO : dataList) {Stopwatch stopwatch Stopwatch.createStarted();// 这里的逻辑是为了探测客户端是否已经启动while (true) {try (Socket ignored new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {break;} catch (IOException e) {long sleepTime 1000;// maybe the port is delay exposedif (stopwatch.elapsed(TimeUnit.SECONDS) 5) {LOG.error(host:{}, port:{} connection failed, will retry,uriRegisterDTO.getHost(), uriRegisterDTO.getPort());// If the connection fails for a long time, Increase sleep timeif (stopwatch.elapsed(TimeUnit.SECONDS) 180) {sleepTime 10000;}}try {TimeUnit.MILLISECONDS.sleep(sleepTime);} catch (InterruptedException ex) {LOG.error(interrupted when sleep, ex);}}}ShenyuClientShutdownHook.delayOtherHooks();// 向注册中心注册URI数据shenyuClientRegisterRepository.persistURI(uriRegisterDTO);// 优雅停机ShutdownHookManager.get().addShutdownHook(new Thread(() - {final URIRegisterDTO offlineDTO new URIRegisterDTO();BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);offlineDTO.setEventType(EventType.OFFLINE);shenyuClientRegisterRepository.offline(offlineDTO);}), 2);}}
}URI注册逻辑基本相似只是比元数据多了一步探测客户端时候已经启动完成的操作保证客户端启动完成后再注册URI后面的逻辑就跟元数据一样了。 分析到这里就将客户端的注册逻辑分析完了通过读取自定义的注解信息构造元数据和URI将数据发到Disruptor队列然后从队列中消费数据将消费者放到线程池中去执行最终通过发送http请求到admin元数据注册接口是/shenyu-client/register-metadataURI注册接口是/shenyu-client/register-uri。
2、服务端注册流程
2.1、读取配置
从前面分析到admin的两个注册接口分别是/shenyu-client/register-metadata和/shenyu-client/register-uri通过全局搜索发现这两个接口在ShenyuClientHttpRegistryController类。
RequestMapping(/shenyu-client)
Join
public class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {/*** 注册元数据*/PostMapping(/register-metadata)ResponseBodypublic String registerMetadata(RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {// 直接将元数据发布到Disruptorpublisher.publish(metaDataRegisterDTO);return ShenyuResultMessage.SUCCESS;}/*** 注册URI*/PostMapping(/register-uri)ResponseBodypublic String registerURI(RequestBody final URIRegisterDTO uriRegisterDTO) {// 直接将URI发布到Disruptorpublisher.publish(uriRegisterDTO);return ShenyuResultMessage.SUCCESS;}
}但是这个类上并没有RestController注解那它就不能做为一个bean被spring扫描到那它是如何创建的呢 不着急想要从注册中心中监听到数据http注册方式可以将admin当作注册中心自然需要有注册中心配置来表明使用哪个注册中心。admin的注册中心配置是RegisterCenterConfiguration我们先看这个配置类
/*** 注册中心配置类*/
Configuration
public class RegisterCenterConfiguration {/*** 读取shenyu.register配置*/BeanConfigurationProperties(prefix shenyu.register)public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {return new ShenyuRegisterCenterConfig();}/*** 创建用于服务端的注册类从注册中心中监听数据然后将数据写入Disruptor队列中*/Bean(destroyMethod close)public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,final ListShenyuClientRegisterService shenyuClientRegisterService) {// 从配置中获取注册类型String registerType shenyuRegisterCenterConfig.getRegisterType();// 根据注册类型通过SPI方式创建对应的ShenyuClientServerRegisterRepositoryShenyuClientServerRegisterRepository registerRepository ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);// 创建Disruptor发布者RegisterClientServerDisruptorPublisher publisher RegisterClientServerDisruptorPublisher.getInstance();// 每种客户端类型rpc类型的处理类MapString, ShenyuClientRegisterService registerServiceMap shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, Function.identity()));// 启动Disruptor添加元数据和URI的订阅器publisher.start(registerServiceMap);// 初始化注册中心registerRepository.init(publisher, shenyuRegisterCenterConfig);return registerRepository;}
}该配置类创建了2个bean
ShenyuRegisterCenterConfigshenyu-admin注册中心配置读取shenyu.register属性配置。ShenyuClientServerRegisterRepository服务端注册类用于从注册中心中监听数据然后将数据写入Disruptor队列中。
这里的创建Disruptor发布者启动Disruptor等逻辑跟在客户端那边的一样只是类是服务端这边的就不再分析了。
2.1.1、用于监听的ShenyuClientServerRegisterRepository
上面生成的ShenyuClientServerRegisterRepository是用于实现服务端注册的接口会根据注册中心的配置通过SPI方式创建注册类每一个注册方式都对应一个实现类。 目前支持7种注册类型
HttpShenyuClientHttpRegistryControllerApolloApolloClientServerRegisterRepositoryZookeeperZookeeperClientServerRegisterRepositoryEtcdEtcdClientServerRegisterRepositoryNacosNacosClientServerRegisterRepositoryConsulConsulClientServerRegisterRepositoryPolarisPolarisClientServerRegisterRepository 加载类型通过registerType指定也就是我们在配置文件中指定的类型
shenyu:register:registerType: http服务端的注册类型必须跟客户端的注册类型一致这样服务端才可以监听到注册信息。这里要指定的是http所以这里创建的就是ShenyuClientHttpRegistryController这个不就是前面说到的注册接口所在的类么所以回到前面的ShenyuClientHttpRegistryController。
2.2、注册元数据和URI
2.2.1、注册接口接收数据写入Disruptor
RequestMapping(/shenyu-client)
Join
public class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {/*** 注册元数据*/PostMapping(/register-metadata)ResponseBodypublic String registerMetadata(RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {// 直接将元数据发布到Disruptorpublisher.publish(metaDataRegisterDTO);return ShenyuResultMessage.SUCCESS;}/*** 注册URI*/PostMapping(/register-uri)ResponseBodypublic String registerURI(RequestBody final URIRegisterDTO uriRegisterDTO) {// 直接将URI发布到Disruptorpublisher.publish(uriRegisterDTO);return ShenyuResultMessage.SUCCESS;}
}两个注册接口获取到数据后就直接调用了publisher.publish()方法把数据发布到Disruptor队列中。
2.2.2、Disruptor消费数据并持久化
QueueConsumer实现了WorkHandler接口是Disruptor的消费者消费逻辑就在它的onEvent方法中
public class QueueConsumerT implements WorkHandlerDataEventT {private final OrderlyExecutor executor;private final QueueConsumerFactoryT factory;/*** Instantiates a new Queue consumer.** param executor the executor* param factory the factory*/public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactoryT factory) {this.executor executor;this.factory factory;}Overridepublic void onEvent(final DataEventT t) {if (Objects.nonNull(t)) {// 根据事件类型使用不同的线程池ThreadPoolExecutor executor orderly(t);// 通过工厂创建队列消费任务 RegisterServerConsumerExecutorQueueConsumerExecutorT queueConsumerExecutor factory.create();// 为消费任务设置数据queueConsumerExecutor.setData(t.getData());t.setData(null);// 放在线程池中执行 消费任务executor.execute(queueConsumerExecutor);}}// ...
}分析客户端注册流程的时候说到RegisterServerConsumerExecutor是服务端消费者任务处理数据入库操作和发布事件。 RegisterServerConsumerExecutor消费逻辑
public final class RegisterServerConsumerExecutor extends QueueConsumerExecutorCollectionDataTypeParent {// 每种数据类型的订阅器执行器private final MapDataType, ExecutorSubscriberDataTypeParent subscribers;private RegisterServerConsumerExecutor(final MapDataType, ExecutorTypeSubscriberDataTypeParent executorSubscriberMap) {this.subscribers new HashMap(executorSubscriberMap);}Overridepublic void run() {CollectionDataTypeParent results getData().stream().filter(this::isValidData).collect(Collectors.toList());if (CollectionUtils.isEmpty(results)) {return;}// 选择对应的数据类型的订阅器执行器去执行selectExecutor(results).executor(results);}private ExecutorSubscriberDataTypeParent selectExecutor(final CollectionDataTypeParent list) {final OptionalDataTypeParent first list.stream().findFirst();return subscribers.get(first.orElseThrow(() - new RuntimeException(the data type is not found)).getType());}// ...
}
根据不同的数据类型使用不同的订阅器执行器去执行这些订阅器是在disruptor启动的时候设置的。 服务端的订阅器有3个分别为MetadataExecutorSubscriberURIRegisterExecutorSubscriber和ApiDocExecutorSubscriber分别处理元数据URI和API文档。
元数据的处理
public class MetadataExecutorSubscriber implements ExecutorTypeSubscriberMetaDataRegisterDTO {// 每种客户端类型的注册服务private final MapString, ShenyuClientRegisterService shenyuClientRegisterService;public MetadataExecutorSubscriber(final MapString, ShenyuClientRegisterService shenyuClientRegisterService) {this.shenyuClientRegisterService shenyuClientRegisterService;}Overridepublic DataType getType() {return DataType.META_DATA;}Overridepublic void executor(final CollectionMetaDataRegisterDTO metaDataRegisterDTOList) {// 遍历元数据metaDataRegisterDTOList.forEach(meta - {// 根据客户端类型Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())).ifPresent(shenyuClientRegisterService - {// 加锁保证数据顺序执行防止并发synchronized (shenyuClientRegisterService) {// 处理数据shenyuClientRegisterService.register(meta);}});});}
}ShenyuClientRegisterService是注册方法接口它有多个实现类
AbstractContextPathRegisterService抽象类处理部分公共逻辑AbstractShenyuClientRegisterServiceImpl抽象类处理部分公共逻辑ShenyuClientRegisterDivideServiceImpldivide类处理http注册类型ShenyuClientRegisterDubboServiceImpldubbo类处理dubbo注册类型ShenyuClientRegisterGrpcServiceImplgRPC类处理gRPC注册类型ShenyuClientRegisterBrpcServiceImplbRPC类处理bRPC注册类型ShenyuClientRegisterMotanServiceImplMotan类处理Motan注册类型ShenyuClientRegisterSofaServiceImplSofa类处理Sofa注册类型ShenyuClientRegisterSpringCloudServiceImplSpringCloud类处理SpringCloud注册类型ShenyuClientRegisterTarsServiceImplTars类处理Tars注册类型ShenyuClientRegisterWebSocketServiceImplWebsocket类处理Websocket注册类型
每一种rpc类型都对应一个注册处理类所以本文是使用ShenyuClientRegisterDivideServiceImpl来处理。
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {Resourceprivate ApplicationEventPublisher eventPublisher;// 这几个就是操作数据库的serviceResourceprivate SelectorService selectorService;Resourceprivate MetaDataService metaDataService;Resourceprivate RuleService ruleService;Overridepublic String register(final MetaDataRegisterDTO dto) {// 1、注册选择器可以认为一个服务就是一个选择器// 选择器执行逻辑默认情况是空的需要在控制台另外手动配置// 子类实现String selectorHandler selectorHandler(dto);// 持久化选择器并发布选择器变更事件不存在的时候ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATEString selectorId selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);// 2、注册规则可以认为一个元数据就是一个规则根据path判断是否同一个// 规则处理逻辑// 子类实现都是直接创建一个各自rpc类型的默认逻辑String ruleHandler ruleHandler();// 构建规则DTORuleDTO ruleDTO buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);// 持久化规则并发布规则变更事件不存在的时候ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATEruleService.registerDefault(ruleDTO);// 3、注册元数据并发布元数据变更事件已存在发布元数据更新事件不存在发布元数据创建事件// 子类实现registerMetadata(dto);// 4、注册contextPath只有httpspringCloudwebSocket类型才有String contextPath dto.getContextPath();if (StringUtils.isNotEmpty(contextPath)) {registerContextPath(dto);}return ShenyuResultMessage.SUCCESS;}}整个注册处理逻辑可以分为4步
注册选择器构建选择器默认情况下一个服务就是一个选择器。之后将选择器插入数据库并发布选择器变更事件。
Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {// 以contextPath或appName作为选择器名称String contextPath ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());// 根据选择器名和插件名从数据库中查询选择器SelectorDO selectorDO findByNameAndPluginName(contextPath, pluginName);// 如果还不存在就创建一个选择器插入数据库if (Objects.isNull(selectorDO)) {// 构建选择器DTOSelectorDTO selectorDTO SelectorUtil.buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());selectorDTO.setHandle(selectorHandler);// 注册选择器并发布事件 ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATEreturn registerDefault(selectorDTO);}return selectorDO.getId();
}注册规则可以认为一个元数据就是一个规则根据path判断是否同一个。 构建规则
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {// 构建规则DTORuleDTO ruleDTO RuleDTO.builder().selectorId(selectorId).name(ruleName).matchMode(MatchModeEnum.AND.getCode()).enabled(Boolean.TRUE).loged(Boolean.TRUE).matchRestful(Boolean.FALSE).sort(1).handle(ruleHandler).build();// 将{xxx}替换成**String conditionPath this.rewritePath(path);RuleConditionDTO ruleConditionDTO RuleConditionDTO.builder().paramType(ParamTypeEnum.URI.getName()).paramName(/).paramValue(conditionPath).build();// 设置规则条件if (conditionPath.endsWith(AdminConstants.URI_SLASH_SUFFIX)) {ruleConditionDTO.setOperator(OperatorEnum.STARTS_WITH.getAlias());} else if (conditionPath.endsWith(AdminConstants.URI_SUFFIX)) {ruleConditionDTO.setOperator(OperatorEnum.PATH_PATTERN.getAlias());} else if (conditionPath.indexOf(*) 1) {ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());} else {ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());}ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));return ruleDTO;
}保存规则
Override
public String registerDefault(final RuleDTO ruleDTO) {// 选择器下已经存在同名的规则则直接返回什么也不干if (Objects.nonNull(ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()))) {return ;}RuleDO ruleDO RuleDO.buildRuleDO(ruleDTO);if (StringUtils.isEmpty(ruleDTO.getId())) {// 插入规则ruleMapper.insertSelective(ruleDO);// 插入规则条件addCondition(ruleDO, ruleDTO.getRuleConditions());}// 发布规则变更事件 ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATEruleEventPublisher.onRegister(ruleDO, ruleDTO.getRuleConditions());return ruleDO.getId();
}具体的规则设计建议去看官方文档。 3. 注册元数据直接将注册上来的元数据保存
Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {if (dto.isRegisterMetaData()) {MetaDataService metaDataService getMetaDataService();// 根据路径查询元数据时候已存在MetaDataDO exist metaDataService.findByPath(dto.getPath());// 已存在就更新发布元数据更新事件不存在就插入发布元数据创建事件metaDataService.saveOrUpdateMetaData(exist, dto);}
}4、注册ContextPath只有httpspringCloudwebSocket类型才有。处理的逻辑在AbstractContextPathRegisterService中。
public abstract class AbstractContextPathRegisterService extends AbstractShenyuClientRegisterServiceImpl {Overridepublic void registerContextPath(final MetaDataRegisterDTO dto) {// 持久化contextPath插件下的选择器并发布选择器变更事件String contextPathSelectorId getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), );// 创建规则处理逻辑ContextMappingRuleHandle handle new ContextMappingRuleHandle();handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));handle.setAddPrefixed(dto.getAddPrefixed());// 注册contextPath插件默认的规则contextPath就是规则名并发布规则变更事件getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));}
}● URI的处理 URI数据是由URIRegisterExecutorSubscriber订阅器处理
public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriberURIRegisterDTO {Overridepublic void executor(final CollectionURIRegisterDTO dataList) {if (CollectionUtils.isEmpty(dataList)) {return;}// 根据rpc类型分类final MapString, ListURIRegisterDTO groupByRpcType dataList.stream().filter(data - StringUtils.isNotBlank(data.getRpcType())).collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));for (Map.EntryString, ListURIRegisterDTO entry : groupByRpcType.entrySet()) {// 根据不同rpc类型使用对应的shenyuClientRegisterService处理final String rpcType entry.getKey();Optional.ofNullable(shenyuClientRegisterService.get(rpcType)).ifPresent(service - {final ListURIRegisterDTO list entry.getValue();// 再以contextPath/appName分类MapString, ListURIRegisterDTO listMap buildData(list);listMap.forEach((selectorName, uriList) - {final ListURIRegisterDTO register new LinkedList();final ListURIRegisterDTO offline new LinkedList();for (URIRegisterDTO d : uriList) {final EventType eventType d.getEventType();// 判断是注册类型还是下线类型if (Objects.isNull(eventType) || EventType.REGISTER.equals(eventType)) {// eventType is null, should be old versionsregister.add(d);} else if (EventType.OFFLINE.equals(eventType)) {offline.add(d);}}if (CollectionUtils.isNotEmpty(register)) {// 注册URIservice.registerURI(selectorName, register);}if (CollectionUtils.isNotEmpty(offline)) {// 下线URIservice.offline(selectorName, offline);}});});}}private MapString, ListURIRegisterDTO buildData(final CollectionURIRegisterDTO dataList) {MapString, ListURIRegisterDTO resultMap new HashMap(8);for (URIRegisterDTO dto : dataList) {String contextPath dto.getContextPath();String key StringUtils.isNotEmpty(contextPath) ? contextPath : dto.getAppName();if (StringUtils.isNotEmpty(key)) {if (resultMap.containsKey(key)) {ListURIRegisterDTO existList resultMap.get(key);existList.add(dto);resultMap.put(key, existList);} else {resultMap.put(key, Lists.newArrayList(dto));}}}return resultMap;}
}
调到FallbackShenyuClientRegisterService的registerURI()方法
Override
public String registerURI(final String selectorName, final ListURIRegisterDTO uriList) {String result;String key key(selectorName);try {this.removeFallBack(key);result this.doRegisterURI(selectorName, uriList);logger.info(Register success: {},{}, selectorName, uriList);} catch (Exception ex) {logger.warn(Register exception: cause:{}, ex.getMessage());result ;this.addFallback(key, new FallbackHolder(selectorName, uriList));}return result;
}FallbackShenyuClientRegisterService是用来异常处理的然后调用doRegisterURI()做真正处理。
Override
public String doRegisterURI(final String selectorName, final ListURIRegisterDTO uriList) {if (CollectionUtils.isEmpty(uriList)) {return ;}// 查询对应的选择器SelectorDO selectorDO selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));if (Objects.isNull(selectorDO)) {throw new ShenyuException(doRegister Failed to execute,wait to retry.);}// 过滤port或host为空的URIListURIRegisterDTO validUriList uriList.stream().filter(dto - Objects.nonNull(dto.getPort()) StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());// 由URI构建处理选择器中的handler信息更新选择器中的handler// 应该就是相当于添加上服务实例信息String handler buildHandle(validUriList, selectorDO);if (handler ! null) {selectorDO.setHandle(handler);SelectorData selectorData selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));selectorData.setHandle(handler);// 更新数据库selectorService.updateSelective(selectorDO);// 发布选择器变更事件eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));}return ShenyuResultMessage.SUCCESS;
}总结就是admin拿到URI数据后更新选择器中的handler信息然后写入到数据库最后发布事件。 更新的就是这里的信息 至此服务端注册流程也就分析完了主要通过对外提供的接口接受客户端的注册信息然后写入到Disruptor队列再从中消费数据根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler。
参考资料 官方博客