当前位置: 首页 > news >正文

html怎么做成网站dede网站演示

html怎么做成网站,dede网站演示,秦皇岛网站建设找汉狮,黑龙江省建设网站首页Sentinel源码分析 1.Sentinel的基本概念 Sentinel实现限流、隔离、降级、熔断等功能#xff0c;本质要做的就是两件事情#xff1a; 统计数据#xff1a;统计某个资源的访问数据#xff08;QPS、RT等信息#xff09; 规则判断#xff1a;判断限流规则、隔离规则、降级规… Sentinel源码分析 1.Sentinel的基本概念 Sentinel实现限流、隔离、降级、熔断等功能本质要做的就是两件事情 统计数据统计某个资源的访问数据QPS、RT等信息 规则判断判断限流规则、隔离规则、降级规则、熔断规则是否满足 这里的资源就是希望被Sentinel保护的业务例如项目中定义的controller方法就是默认被Sentinel保护的资源。 1.1.ProcessorSlotChain 实现上述功能的核心骨架是一个叫做ProcessorSlotChain的类。这个类基于责任链模式来设计将不同的功能限流、降级、系统保护封装为一个个的Slot请求进入后逐个执行即可。 其工作流如图 责任链中的Slot也分为两大类 统计数据构建部分statistic NodeSelectorSlot负责构建簇点链路中的节点DefaultNode将这些节点形成链路树 ClusterBuilderSlot负责构建某个资源的ClusterNodeClusterNode可以保存资源的运行信息响应时间、QPS、block 数目、线程数、异常数等以及来源信息origin名称 StatisticSlot负责统计实时调用数据包括运行信息、来源信息等 规则判断部分rule checking AuthoritySlot负责授权规则来源控制 SystemSlot负责系统保护规则 ParamFlowSlot负责热点参数限流规则 FlowSlot负责限流规则 DegradeSlot负责降级规则 1.2.Node Sentinel中的簇点链路是由一个个的Node组成的Node是一个接口包括下面的实现 controller里面的每一个方法都是不同的入口节点方法比如两个controller方法调用service中的同一个方法那么service的该资源方法就会创建两个不同的defaultNode节点。 可以认为DefaultNode类型是链路模式使用的而ClusterNode类型则是非链路模式使用。 所有的节点都可以记录对资源的访问统计数据所以都是StatisticNode的子类。 按照作用分为两类Node DefaultNode代表链路树中的每一个资源一个资源出现在不同链路中时会创建不同的DefaultNode节点。而树的入口节点叫EntranceNode是一种特殊的DefaultNode ClusterNode代表资源一个资源不管出现在多少链路中只会有一个ClusterNode。记录的是当前资源被访问的所有统计数据之和。 DefaultNode记录的是资源在当前链路中的访问数据用来实现基于链路模式的限流规则。ClusterNode记录的是资源在所有链路中的访问数据实现默认模式、关联模式的限流规则。 例如我们在一个SpringMVC项目中有两个业务 业务1controller中的资源/order/query访问了service中的资源/goods 业务2controller中的资源/order/save访问了service中的资源/goods 创建的链路图如下 1.3.Entry 默认情况下Sentinel会将controller中的方法作为被保护资源那么问题来了我们该如何将自己的一段代码标记为一个Sentinel的资源呢 Sentinel中的资源用Entry来表示。声明Entry的API示例 // 资源名可使用任意有业务语义的字符串比如方法名、接口名或其它可唯一标识的字符串。 try (Entry entry SphU.entry(resourceName)) {// 被保护的业务逻辑// do something here... } catch (BlockException ex) {// 资源访问阻止被限流或被降级// 在此处进行相应的处理操作 } 1.3.1.自定义资源 例如我们在order-service服务中将OrderService的queryOrderById()方法标记为一个资源。 1首先在order-service中引入sentinel依赖 !--sentinel-- dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-sentinel/artifactId /dependency 2然后配置Sentinel地址 spring:cloud:sentinel:transport:dashboard: localhost:8089 # 这里我的sentinel用了8089的端口 3修改OrderService类的queryOrderById方法 代码这样来实现 public Order queryOrderById(Long orderId) {// 创建Entry标记资源资源名为resource1try (Entry entry SphU.entry(resource1)) {// 1.查询订单这里是假数据Order order Order.build(101L, 4999L, 小米 MIX4, 1, 1L, null);// 2.查询用户基于Feign的远程调用User user userClient.findById(order.getUserId());// 3.设置order.setUser(user);// 4.返回return order;}catch (BlockException e){log.error(被限流或降级, e);return null;} } 4访问 打开浏览器访问order服务http://localhost:8080/order/101 然后打开sentinel控制台查看簇点链路 1.3.2.基于注解标记资源 在之前学习Sentinel的时候我们知道可以通过给方法添加SentinelResource注解的形式来标记资源。 这个是怎么实现的呢 来看下我们引入的Sentinel依赖包 其中的spring.factories声明需要就是自动装配的配置类内容如下 我们来看下SentinelAutoConfiguration这个类 可以看到在这里声明了一个BeanSentinelResourceAspect ​/*** Aspect for methods with {link SentinelResource} annotation.** author Eric Zhao*/ Aspect public class SentinelResourceAspect extends AbstractSentinelAspectSupport {// 切点是添加了 SentinelResource注解的类Pointcut(annotation(com.alibaba.csp.sentinel.annotation.SentinelResource))public void sentinelResourceAnnotationPointcut() {}// 环绕增强Around(sentinelResourceAnnotationPointcut())public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {// 获取受保护的方法Method originMethod resolveMethod(pjp);// 获取 SentinelResource注解SentinelResource annotation originMethod.getAnnotation(SentinelResource.class);if (annotation null) {// Should not go through here.throw new IllegalStateException(Wrong state for SentinelResource annotation);}// 获取注解上的资源名称String resourceName getResourceName(annotation.value(), originMethod);EntryType entryType annotation.entryType();int resourceType annotation.resourceType();Entry entry null;try {// 创建资源 Entryentry SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());// 执行受保护的方法Object result pjp.proceed();return result;} catch (BlockException ex) {return handleBlockException(pjp, annotation, ex);} catch (Throwable ex) {Class? extends Throwable[] exceptionsToIgnore annotation.exceptionsToIgnore();// The ignore list will be checked first.if (exceptionsToIgnore.length 0 exceptionBelongsTo(ex, exceptionsToIgnore)) {throw ex;}if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {traceException(ex);return handleFallback(pjp, annotation, ex);} ​// No fallback function can handle the exception, so throw it out.throw ex;} finally {if (entry ! null) {entry.exit(1, pjp.getArgs());}}} } ​ 简单来说SentinelResource注解就是一个标记而Sentinel基于AOP思想对被标记的方法做环绕增强完成资源Entry的创建。 1.4.Context 上一节我们发现簇点链路中除了controller方法、service方法两个资源外还多了一个默认的入口节点 sentinel_spring_web_context是一个EntranceNode类型的节点 这个节点是在初始化Context的时候由Sentinel帮我们创建的。 1.4.1.什么是Context 那么什么是Context呢 Context 代表调用链路上下文贯穿一次调用链路中的所有资源 Entry基于ThreadLocal。 Context 维持着入口节点entranceNode、本次调用链路的 curNode当前资源节点、调用来源origin等信息。 后续的Slot都可以通过Context拿到DefaultNode或者ClusterNode从而获取统计数据完成规则判断 Context初始化的过程中会创建EntranceNodecontextName就是EntranceNode的名称 对应的API如下 // 创建context包含两个参数context名称、 来源名称 ContextUtil.enter(contextName, originName); 1.4.2.Context的初始化 那么这个Context又是在何时完成初始化的呢 1.4.2.1.自动装配 来看下我们引入的Sentinel依赖包 其中的spring.factories声明需要就是自动装配的配置类内容如下 我们先看SentinelWebAutoConfiguration这个类 这个类实现了WebMvcConfigurer我们知道这个是SpringMVC自定义配置用到的类可以配置HandlerInterceptor 可以看到这里配置了一个SentinelWebInterceptor的拦截器。 SentinelWebInterceptor的声明如下 发现它继承了AbstractSentinelInterceptor这个类。 HandlerInterceptor拦截器会拦截一切进入controller的方法执行preHandle前置拦截方法而Context的初始化就是在这里完成的。 1.4.2.2.AbstractSentinelInterceptor HandlerInterceptor拦截器会拦截一切进入controller的方法执行preHandle前置拦截方法而Context的初始化就是在这里完成的。 我们来看看这个类的preHandle实现 Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)throws Exception {try {// 获取资源名称一般是controller方法的RequestMapping路径例如/order/{orderId}String resourceName getResourceName(request);if (StringUtil.isEmpty(resourceName)) {return true;}// 从request中获取请求来源将来做 授权规则 判断时会用String origin parseOrigin(request);// 获取 contextName默认是sentinel_spring_web_contextString contextName getContextName(request);// 创建 ContextContextUtil.enter(contextName, origin);// 创建资源名称就是当前请求的controller方法的映射路径Entry entry SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);return true;} catch (BlockException e) {try {handleBlockException(request, response, e);} finally {ContextUtil.exit();}return false;} } 1.4.2.3.ContextUtil 创建Context的方法就是ContextUtil.enter(contextName, origin); 我们进入该方法 public static Context enter(String name, String origin) {if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {throw new ContextNameDefineException(The Constants.CONTEXT_DEFAULT_NAME cant be permit to defined!);}return trueEnter(name, origin); } 进入trueEnter方法 protected static Context trueEnter(String name, String origin) {// 尝试获取contextContext context contextHolder.get();// 判空if (context null) {// 如果为空开始初始化MapString, DefaultNode localCacheNameMap contextNameNodeMap;// 尝试获取入口节点DefaultNode node localCacheNameMap.get(name);if (node null) {LOCK.lock();try {node contextNameNodeMap.get(name);if (node null) {// 入口节点为空初始化入口节点 EntranceNodenode new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);// 添加入口节点到 ROOTConstants.ROOT.addChild(node);// 将入口节点放入缓存MapString, DefaultNode newMap new HashMap(contextNameNodeMap.size() 1);newMap.putAll(contextNameNodeMap);newMap.put(name, node);contextNameNodeMap newMap;}} finally {LOCK.unlock();}}// 创建Context参数为入口节点 和 contextNamecontext new Context(node, name);// 设置请求来源 origincontext.setOrigin(origin);// 放入ThreadLocalcontextHolder.set(context);}// 返回return context; } 执行逻辑如下图 2.ProcessorSlotChain执行流程 接下来我们跟踪源码验证下ProcessorSlotChain的执行流程。 2.1.入口 首先回到一切的入口AbstractSentinelInterceptor类的preHandle方法 还有SentinelResourceAspect的环绕增强方法 可以看到任何一个资源必定要执行SphU.entry()这个方法: public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)throws BlockException {return Env.sph.entryWithType(name, resourceType, trafficType, 1, args); } 继续进入Env.sph.entryWithType(name, resourceType, trafficType, 1, args);Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,Object[] args) throws BlockException {// 将 资源名称等基本信息 封装为一个 StringResourceWrapper对象StringResourceWrapper resource new StringResourceWrapper(name, entryType, resourceType);// 继续return entryWithPriority(resource, count, prioritized, args); } 进入entryWithPriority方法 private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {// 获取 ContextContext context ContextUtil.getContext();if (context null) {// Using default context.context InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);} 、 // 获取 Slot执行链同一个资源会创建一个执行链放入缓存ProcessorSlotObject chain lookProcessChain(resourceWrapper);// 创建 Entry并将 resource、chain、context 记录在 Entry中Entry e new CtEntry(resourceWrapper, chain, context);try {// 执行 slotChainchain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info(Sentinel unexpected exception, e1);}return e; } 在这段代码中会获取ProcessorSlotChain对象然后基于chain.entry()开始执行slotChain中的每一个Slot. 而这里创建的是其实现类DefaultProcessorSlotChain. 获取ProcessorSlotChain以后会保存到一个Map中key是ResourceWrapper值是ProcessorSlotChain. 所以一个资源只会有一个ProcessorSlotChain. 2.2.DefaultProcessorSlotChain 我们进入DefaultProcessorSlotChain的entry方法 Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)throws Throwable {// first就是责任链中的第一个 slotfirst.transformEntry(context, resourceWrapper, t, count, prioritized, args); } 这里的first类型是AbstractLinkedProcessorSlot 看下继承关系 因此first一定是这些实现类中的一个按照最早讲的责任链顺序first应该就是 NodeSelectorSlot。 不过既然是基于责任链模式所以这里只要记住下一个slot就可以了也就是next next确实是NodeSelectSlot类型。 而NodeSelectSlot的next一定是ClusterBuilderSlot依次类推 责任链就建立起来了。 2.3.NodeSelectorSlot NodeSelectorSlot负责构建簇点链路中的节点DefaultNode将这些节点形成链路树。 核心代码 Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {// 尝试获取 当前资源的 DefaultNodeDefaultNode node map.get(context.getName());if (node null) {synchronized (this) {node map.get(context.getName());if (node null) {// 如果为空为当前资源创建一个新的 DefaultNodenode new DefaultNode(resourceWrapper, null);HashMapString, DefaultNode cacheMap new HashMapString, DefaultNode(map.size());cacheMap.putAll(map);// 放入缓存中注意这里的 key是contextName// 这样不同链路进入相同资源就会创建多个 DefaultNodecacheMap.put(context.getName(), node);map cacheMap;// 当前节点加入上一节点的 child中这样就构成了调用链路树((DefaultNode) context.getLastNode()).addChild(node);}}}// context中的curNode当前节点设置为新的 nodecontext.setCurNode(node);// 执行下一个 slotfireEntry(context, resourceWrapper, node, count, prioritized, args); } 这个Slot完成了这么几件事情 为当前资源创建 DefaultNode 将DefaultNode放入缓存中key是contextName这样不同链路入口的请求将会创建多个DefaultNode相同链路则只有一个DefaultNode 将当前资源的DefaultNode设置为上一个资源的childNode 将当前资源的DefaultNode设置为Context中的curNode当前节点 下一个slot就是ClusterBuilderSlot 2.4.ClusterBuilderSlot ClusterBuilderSlot负责构建某个资源的ClusterNode核心代码 Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,int count, boolean prioritized, Object... args)throws Throwable {// 判空注意ClusterNode是共享的成员变量也就是说一个资源只有一个ClusterNode与链路无关if (clusterNode null) {synchronized (lock) {if (clusterNode null) {// 创建 cluster node.clusterNode new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());HashMapResourceWrapper, ClusterNode newMap new HashMap(Math.max(clusterNodeMap.size(), 16));newMap.putAll(clusterNodeMap);// 放入缓存可以是nodeId也就是resource名称newMap.put(node.getId(), clusterNode);clusterNodeMap newMap;}}}// 将资源的 DefaultNode与 ClusterNode关联node.setClusterNode(clusterNode);// 记录请求来源 origin 将 origin放入 entryif (!.equals(context.getOrigin())) {Node originNode node.getClusterNode().getOrCreateOriginNode(context.getOrigin());context.getCurEntry().setOriginNode(originNode);}// 继续下一个slotfireEntry(context, resourceWrapper, node, count, prioritized, args); } 2.5.StatisticSlot StatisticSlot负责统计实时调用数据包括运行信息访问次数、线程数、来源信息等。 StatisticSlot是实现限流的关键其中基于滑动时间窗口算法维护了计数器统计进入某个资源的请求次数。 核心代码 Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {try {// 放行到下一个 slot做限流、降级等判断fireEntry(context, resourceWrapper, node, count, prioritized, args);// 请求通过了, 线程计数器 1 用作线程隔离node.increaseThreadNum();// 请求计数器 1 用作限流node.addPassRequest(count);if (context.getCurEntry().getOriginNode() ! null) {// 如果有 origin来源计数器也都要 1context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}if (resourceWrapper.getEntryType() EntryType.IN) {// 如果是入口资源还要给全局计数器 1.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}// 请求通过后的回调.for (ProcessorSlotEntryCallbackDefaultNode handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (Throwable e) {// 各种异常处理就省略了。。。context.getCurEntry().setError(e);throw e;} } 另外需要注意的是所有的计数1动作都包括两部分以node.addPassRequest(count);为例 Override public void addPassRequest(int count) {// DefaultNode的计数器代表当前链路的 计数器super.addPassRequest(count);// ClusterNode计数器代表当前资源的 总计数器this.clusterNode.addPassRequest(count); } 具体计数方式我们后续再看。 接下来进入规则校验的相关slot了依次是 AuthoritySlot负责授权规则来源控制 SystemSlot负责系统保护规则 ParamFlowSlot负责热点参数限流规则 FlowSlot负责限流规则 DegradeSlot负责降级规则 2.6.AuthoritySlot 负责请求来源origin的授权规则判断如图 核心API Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)throws Throwable {// 校验黑白名单checkBlackWhiteAuthority(resourceWrapper, context);// 进入下一个 slotfireEntry(context, resourceWrapper, node, count, prioritized, args); } 黑白名单校验的逻辑 void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {// 获取授权规则MapString, SetAuthorityRule authorityRules AuthorityRuleManager.getAuthorityRules();if (authorityRules null) {return;}SetAuthorityRule rules authorityRules.get(resource.getName());if (rules null) {return;}// 遍历规则并判断for (AuthorityRule rule : rules) {if (!AuthorityRuleChecker.passCheck(rule, context)) {// 规则不通过直接抛出异常throw new AuthorityException(context.getOrigin(), rule);}} } 再看下AuthorityRuleChecker.passCheck(rule, context)方法 static boolean passCheck(AuthorityRule rule, Context context) {// 得到请求来源 originString requester context.getOrigin();// 来源为空或者规则为空都直接放行if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {return true;}// rule.getLimitApp()得到的就是 白名单 或 黑名单 的字符串这里先用 indexOf方法判断int pos rule.getLimitApp().indexOf(requester);boolean contain pos -1;if (contain) {// 如果包含 origin还要进一步做精确判断把名单列表以,分割逐个判断boolean exactlyMatch false;String[] appArray rule.getLimitApp().split(,);for (String app : appArray) {if (requester.equals(app)) {exactlyMatch true;break;}}contain exactlyMatch;}// 如果是黑名单并且包含origin则返回falseint strategy rule.getStrategy();if (strategy RuleConstant.AUTHORITY_BLACK contain) {return false;}// 如果是白名单并且不包含origin则返回falseif (strategy RuleConstant.AUTHORITY_WHITE !contain) {return false;}// 其它情况返回truereturn true; } 2.7.SystemSlot SystemSlot是对系统保护的规则校验 核心API Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {// 系统规则校验SystemRuleManager.checkSystem(resourceWrapper);// 进入下一个 slotfireEntry(context, resourceWrapper, node, count, prioritized, args); } 来看下SystemRuleManager.checkSystem(resourceWrapper);的代码 public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {if (resourceWrapper null) {return;}// Ensure the checking switch is on.if (!checkSystemStatus.get()) {return;}// 只针对入口资源做校验其它直接返回if (resourceWrapper.getEntryType() ! EntryType.IN) {return;}// 全局 QPS校验double currentQps Constants.ENTRY_NODE null ? 0.0 : Constants.ENTRY_NODE.successQps();if (currentQps qps) {throw new SystemBlockException(resourceWrapper.getName(), qps);}// 全局 线程数 校验int currentThread Constants.ENTRY_NODE null ? 0 : Constants.ENTRY_NODE.curThreadNum();if (currentThread maxThread) {throw new SystemBlockException(resourceWrapper.getName(), thread);}// 全局平均 RT校验double rt Constants.ENTRY_NODE null ? 0 : Constants.ENTRY_NODE.avgRt();if (rt maxRt) {throw new SystemBlockException(resourceWrapper.getName(), rt);}// 全局 系统负载 校验if (highestSystemLoadIsSet getCurrentSystemAvgLoad() highestSystemLoad) {if (!checkBbr(currentThread)) {throw new SystemBlockException(resourceWrapper.getName(), load);}}// 全局 CPU使用率 校验if (highestCpuUsageIsSet getCurrentCpuUsage() highestCpuUsage) {throw new SystemBlockException(resourceWrapper.getName(), cpu);} } 2.8.ParamFlowSlot ParamFlowSlot就是热点参数限流如图 是针对进入资源的请求针对不同的请求参数值分别统计QPS的限流方式。 这里的单机阈值就是最大令牌数量maxCount 这里的统计窗口时长就是统计时长duration 含义是每隔duration时间长度内最多生产maxCount个令牌上图配置的含义是每1秒钟生产2个令牌。 核心API Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,int count, boolean prioritized, Object... args) throws Throwable {// 如果没有设置热点规则直接放行if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {fireEntry(context, resourceWrapper, node, count, prioritized, args);return;}// 热点规则判断checkFlow(resourceWrapper, count, args);// 进入下一个 slotfireEntry(context, resourceWrapper, node, count, prioritized, args); } 2.8.1.令牌桶 热点规则判断采用了令牌桶算法来实现参数限流为每一个不同参数值设置令牌桶Sentinel的令牌桶有两部分组成 这两个Map的key都是请求的参数值value却不同其中 tokenCounters用来记录剩余令牌数量 timeCounters用来记录上一个请求的时间 当一个携带参数的请求到来后基本判断流程是这样的 2.9.FlowSlot FlowSlot是负责限流规则的判断如图 包括 三种流控模式直接模式、关联模式、链路模式 三种流控效果快速失败、warm up、排队等待 三种流控模式从底层数据统计角度分为两类 对进入资源的所有请求ClusterNode做限流统计直接模式、关联模式 对进入资源的部分链路DefaultNode做限流统计链路模式 三种流控效果从限流算法来看分为两类 滑动时间窗口算法快速失败、warm up 漏桶算法排队等待效果 2.9.1.核心流程 核心API如下 Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {// 限流规则检测checkFlow(resourceWrapper, context, node, count, prioritized);// 放行fireEntry(context, resourceWrapper, node, count, prioritized, args); } checkFlow方法void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {// checker是 FlowRuleChecker 类的一个对象checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); } 跟入FlowRuleChecker public void checkFlow(FunctionString, CollectionFlowRule ruleProvider, ResourceWrapper resource,Context context, DefaultNode node,int count, boolean prioritized) throws BlockException {if (ruleProvider null || resource null) {return;}// 获取当前资源的所有限流规则CollectionFlowRule rules ruleProvider.apply(resource.getName());if (rules ! null) {for (FlowRule rule : rules) {// 遍历逐个规则做校验if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}} 这里的FlowRule就是限流规则接口其中的几个成员变量刚好对应表单参数 public class FlowRule extends AbstractRule {/*** 阈值类型 (0: 线程, 1: QPS).*/private int grade RuleConstant.FLOW_GRADE_QPS;/*** 阈值.*/private double count;/*** 三种限流模式.** {link RuleConstant#STRATEGY_DIRECT} 直连模式;* {link RuleConstant#STRATEGY_RELATE} 关联模式;* {link RuleConstant#STRATEGY_CHAIN} 链路模式.*/private int strategy RuleConstant.STRATEGY_DIRECT;/*** 关联模式关联的资源名称.*/private String refResource;/*** 3种流控效果.* 0. 快速失败, 1. warm up, 2. 排队等待, 3. warm up 排队等待*/private int controlBehavior RuleConstant.CONTROL_BEHAVIOR_DEFAULT;// 预热时长private int warmUpPeriodSec 10;/*** 队列最大等待时间.*/private int maxQueueingTimeMs 500;// 。。。 略 } 校验的逻辑定义在FlowRuleChecker的canPassCheck方法中 public boolean canPassCheck(/*NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {// 获取限流资源名称String limitApp rule.getLimitApp();if (limitApp null) {return true;}// 校验规则return passLocalCheck(rule, context, node, acquireCount, prioritized); } 进入passLocalCheck() private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node,int acquireCount, boolean prioritized) {// 基于限流模式判断要统计的节点 // 如果是直连模式关联模式对ClusterNode统计如果是链路模式则对DefaultNode统计Node selectedNode selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode null) {return true;}// 判断规则return rule.getRater().canPass(selectedNode, acquireCount, prioritized); } 这里对规则的判断先要通过FlowRule#getRater()获取流量控制器TrafficShapingController然后再做限流。 而TrafficShapingController有3种实现 DefaultController快速失败默认的方式基于滑动时间窗口算法 WarmUpController预热模式基于滑动时间窗口算法只不过阈值是动态的 RateLimiterController排队等待模式基于漏桶算法 最终的限流判断都在TrafficShapingController的canPass方法中。 2.9.2.滑动时间窗口 滑动时间窗口的功能分两部分来看 一是时间区间窗口的QPS计数功能这个是在StatisticSlot中调用的 二是对滑动窗口内的时间区间窗口QPS累加这个是在FlowRule中调用的 先来看时间区间窗口的QPS计数功能。 2.9.2.1.时间窗口请求量统计 回顾2.5章节中的StatisticSlot部分有这样一段代码 就是在统计通过该节点的QPS我们跟入看看这里进入了DefaultNode内部 发现同时对DefaultNode和ClusterNode在做QPS统计我们知道DefaultNode和ClusterNode都是StatisticNode的子类这里调用addPassRequest()方法最终都会进入StatisticNode中。 随便跟入一个 这里有秒、分两种纬度的统计对应两个计数器。找到对应的成员变量可以看到 两个计数器都是ArrayMetric类型并且传入了两个参数 // intervalInMs是滑动窗口的时间间隔默认为 1 秒 // sampleCount: 时间窗口的分隔数量默认为 2就是把 1秒分为 2个小时间窗 public ArrayMetric(int sampleCount, int intervalInMs) {this.data new OccupiableBucketLeapArray(sampleCount, intervalInMs); } 如图 接下来我们进入ArrayMetric类的addPass方法 Override public void addPass(int count) {// 获取当前时间所在的时间窗WindowWrapMetricBucket wrap data.currentWindow();// 计数器 1wrap.value().addPass(count); } 那么计数器如何知道当前所在的窗口是哪个呢 这里的data是一个LeapArray LeapArray的四个属性 public abstract class LeapArrayT {// 小窗口的时间长度默认是500ms 值 intervalInMs / sampleCountprotected int windowLengthInMs;// 滑动窗口内的 小窗口 数量默认为 2protected int sampleCount;// 滑动窗口的时间间隔默认为 1000msprotected int intervalInMs;// 滑动窗口的时间间隔单位为秒默认为 1private double intervalInSecond; } LeapArray是一个环形数组因为时间是无限的数组长度不可能无限因此数组中每一个格子放入一个时间窗window当数组放满后角标归0覆盖最初的window。 因为滑动窗口最多分成sampleCount数量的小窗口因此数组长度只要大于sampleCount那么最近的一个滑动窗口内的2个小窗口就永远不会被覆盖就不用担心旧数据被覆盖的问题了。 我们跟入data.currentWindow();方法 public WindowWrapT currentWindow(long timeMillis) {if (timeMillis 0) {return null;}// 计算当前时间对应的数组角标int idx calculateTimeIdx(timeMillis);// 计算当前时间所在窗口的开始时间.long windowStart calculateWindowStart(timeMillis);/** 先根据角标获取数组中保存的 oldWindow 对象可能是旧数据需要判断.** (1) oldWindow 不存在, 说明是第一次创建新 window并存入然后返回即可* (2) oldWindow的 starTime 本次请求的 windowStar, 说明正是要找的窗口直接返回.* (3) oldWindow的 starTime 本次请求的 windowStar, 说明是旧数据需要被覆盖创建 * 新窗口覆盖旧窗口*/while (true) {WindowWrapT old array.get(idx);if (old null) {// 创建新 windowWindowWrapT window new WindowWrapT(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));// 基于CAS写入数组避免线程安全问题if (array.compareAndSet(idx, null, window)) {// 写入成功返回新的 windowreturn window;} else {// 写入失败说明有并发更新等待其它人更新完成即可Thread.yield();}} else if (windowStart old.windowStart()) {return old;} else if (windowStart old.windowStart()) {if (updateLock.tryLock()) {try {// 获取并发锁覆盖旧窗口并返回return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// 获取锁失败等待其它线程处理就可以了Thread.yield();}} else if (windowStart old.windowStart()) {// 这种情况不应该存在写这里只是以防万一。return new WindowWrapT(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}} } 找到当前时间所在窗口WindowWrap后只要调用WindowWrap对象中的add方法计数器1即可。 这里只负责统计每个窗口的请求量不负责拦截。限流拦截要看FlowSlot中的逻辑。 2.9.2.2.滑动窗口QPS计算 在2.9.1小节我们讲过FlowSlot的限流判断最终都由TrafficShapingController接口中的canPass方法来实现。该接口有三个实现类 DefaultController快速失败默认的方式基于滑动时间窗口算法 WarmUpController预热模式基于滑动时间窗口算法只不过阈值是动态的 RateLimiterController排队等待模式基于漏桶算法 因此我们跟入默认的DefaultController中的canPass方法来分析 Override public boolean canPass(Node node, int acquireCount, boolean prioritized) {// 计算目前为止滑动窗口内已经存在的请求量int curCount avgUsedTokens(node);// 判断已使用请求量 需要的请求量1 是否大于 窗口的请求阈值if (curCount acquireCount count) {// 大于说明超出阈值返回falseif (prioritized grade RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime TimeUtil.currentTimeMillis();waitInMs node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);// PriorityWaitException indicates that the request will pass after waiting for {link waitInMs}.throw new PriorityWaitException(waitInMs);}}return false;}// 小于等于说明在阈值范围内返回truereturn true; } 因此判断的关键就是int curCount avgUsedTokens(node); private int avgUsedTokens(Node node) {if (node null) {return DEFAULT_AVG_USED_TOKENS;}return grade RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); } 因为我们采用的是限流走node.passQps()逻辑 // 这里又进入了 StatisticNode类 Override public double passQps() {// 请求量 ÷ 滑动窗口时间间隔 得到的就是QPSreturn rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec(); } 那么rollingCounterInSecond.pass()是如何得到请求量的呢 // rollingCounterInSecond 本质是ArrayMetric之前说过 Override public long pass() {// 获取当前窗口data.currentWindow();long pass 0;// 获取 当前时间的 滑动窗口范围内 的所有小窗口ListMetricBucket list data.values();// 遍历for (MetricBucket window : list) {// 累加求和pass window.pass();}// 返回return pass; } 来看看data.values()如何获取 滑动窗口范围内 的所有小窗口 // 此处进入LeapArray类中public ListT values(long timeMillis) {if (timeMillis 0) {return new ArrayListT();}// 创建空集合大小等于 LeapArray长度int size array.length();ListT result new ArrayListT(size);// 遍历 LeapArrayfor (int i 0; i size; i) {// 获取每一个小窗口WindowWrapT windowWrap array.get(i);// 判断这个小窗口是否在 滑动窗口时间范围内1秒内if (windowWrap null || isWindowDeprecated(timeMillis, windowWrap)) {// 不在范围内则跳过continue;}// 在范围内则添加到集合中result.add(windowWrap.value());}// 返回集合return result; } 那么isWindowDeprecated(timeMillis, windowWrap)又是如何判断窗口是否符合要求呢 public boolean isWindowDeprecated(long time, WindowWrapT windowWrap) {// 当前时间 - 窗口开始时间 是否大于 滑动窗口的最大间隔1秒// 也就是说我们要统计的时 距离当前时间1秒内的 小窗口的 count之和return time - windowWrap.windowStart() intervalInMs; } 2.9.3.漏桶 上一节我们讲过FlowSlot的限流判断最终都由TrafficShapingController接口中的canPass方法来实现。该接口有三个实现类 DefaultController快速失败默认的方式基于滑动时间窗口算法 WarmUpController预热模式基于滑动时间窗口算法只不过阈值是动态的 RateLimiterController排队等待模式基于漏桶算法 因此我们跟入默认的RateLimiterController中的canPass方法来分析 Override public boolean canPass(Node node, int acquireCount, boolean prioritized) {// Pass when acquire count is less or equal than 0.if (acquireCount 0) {return true;}// 阈值小于等于 0 阻止请求if (count 0) {return false;}// 获取当前时间long currentTime TimeUtil.currentTimeMillis();// 计算两次请求之间允许的最小时间间隔long costTime Math.round(1.0 * (acquireCount) / count * 1000);// 计算本次请求 允许执行的时间点 最近一次请求的可执行时间 两次请求的最小间隔long expectedTime costTime latestPassedTime.get();// 如果允许执行的时间点小于当前时间说明可以立即执行if (expectedTime currentTime) {// 更新上一次的请求的执行时间latestPassedTime.set(currentTime);return true;} else {// 不能立即执行需要计算 预期等待时长// 预期等待时长 两次请求的最小间隔 最近一次请求的可执行时间 - 当前时间long waitTime costTime latestPassedTime.get() - TimeUtil.currentTimeMillis();// 如果预期等待时间超出阈值则拒绝请求if (waitTime maxQueueingTimeMs) {return false;} else {// 预期等待时间小于阈值更新最近一次请求的可执行时间加上costTimelong oldTime latestPassedTime.addAndGet(costTime);try {// 保险起见再判断一次预期等待时间是否超过阈值waitTime oldTime - TimeUtil.currentTimeMillis();if (waitTime maxQueueingTimeMs) {// 如果超过则把刚才 加 的时间再 减回来latestPassedTime.addAndGet(-costTime);// 拒绝return false;}// in race condition waitTime may 0if (waitTime 0) {// 预期等待时间在阈值范围内休眠要等待的时间醒来后继续执行Thread.sleep(waitTime);}return true;} catch (InterruptedException e) {}}}return false; } 与我们之前分析的漏桶算法基本一致 2.10.DegradeSlot 最后一关就是降级规则判断了。 Sentinel的降级是基于状态机来实现的 对应的实现在DegradeSlot类中核心API Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {// 熔断降级规则判断performChecking(context, resourceWrapper);// 继续下一个slotfireEntry(context, resourceWrapper, node, count, prioritized, args); } 继续进入performChecking方法 void performChecking(Context context, ResourceWrapper r) throws BlockException {// 获取当前资源上的所有的断路器 CircuitBreakerListCircuitBreaker circuitBreakers DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers null || circuitBreakers.isEmpty()) {return;}for (CircuitBreaker cb : circuitBreakers) {// 遍历断路器逐个判断if (!cb.tryPass(context)) {throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}} } 2.10.1.CircuitBreaker 我们进入CircuitBreaker的tryPass方法中 Override public boolean tryPass(Context context) {// 判断状态机状态if (currentState.get() State.CLOSED) {// 如果是closed状态直接放行return true;}if (currentState.get() State.OPEN) {// 如果是OPEN状态断路器打开// 继续判断OPEN时间窗是否结束如果是则把状态从OPEN切换到 HALF_OPEN返回truereturn retryTimeoutArrived() fromOpenToHalfOpen(context);}// OPEN状态并且时间窗未到返回falsereturn false; } 有关时间窗的判断在retryTimeoutArrived()方法 protected boolean retryTimeoutArrived() {// 当前时间 大于 下一次 HalfOpen的重试时间return TimeUtil.currentTimeMillis() nextRetryTimestamp; } OPEN到HALF_OPEN切换在fromOpenToHalfOpen(context)方法 protected boolean fromOpenToHalfOpen(Context context) {// 基于CAS修改状态从 OPEN到 HALF_OPENif (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {// 状态变更的事件通知notifyObservers(State.OPEN, State.HALF_OPEN, null);// 得到当前资源Entry entry context.getCurEntry();// 给资源设置监听器在资源Entry销毁时资源业务执行完毕时触发entry.whenTerminate(new BiConsumerContext, Entry() {Overridepublic void accept(Context context, Entry entry) {// 判断 资源业务是否异常if (entry.getBlockError() ! null) {// 如果异常则再次进入OPEN状态currentState.compareAndSet(State.HALF_OPEN, State.OPEN);notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);}}});return true;}return false; } 这里出现了从OPEN到HALF_OPEN、从HALF_OPEN到OPEN的变化但是还有几个没有 从CLOSED到OPEN 从HALF_OPEN到CLOSED 2.10.2.触发断路器 请求经过所有插槽 后一定会执行exit方法而在DegradeSlot的exit方法中 会调用CircuitBreaker的onRequestComplete方法。而CircuitBreaker有两个实现 我们这里以异常比例熔断为例来看进入ExceptionCircuitBreaker的onRequestComplete方法 Override public void onRequestComplete(Context context) {// 获取资源 EntryEntry entry context.getCurEntry();if (entry null) {return;}// 尝试获取 资源中的 异常Throwable error entry.getError();// 获取计数器同样采用了滑动窗口来计数SimpleErrorCounter counter stat.currentWindow().value();if (error ! null) {// 如果出现异常则 error计数器 1counter.getErrorCount().add(1);}// 不管是否出现异常total计数器 1counter.getTotalCount().add(1);// 判断异常比例是否超出阈值handleStateChangeWhenThresholdExceeded(error); } 来看阈值判断的方法 private void handleStateChangeWhenThresholdExceeded(Throwable error) {// 如果当前已经是OPEN状态不做处理if (currentState.get() State.OPEN) {return;}// 如果已经是 HALF_OPEN 状态判断是否需求切换状态if (currentState.get() State.HALF_OPEN) {if (error null) {// 没有异常则从 HALF_OPEN 到 CLOSEDfromHalfOpenToClose();} else {// 有一次再次进入OPENfromHalfOpenToOpen(1.0d);}return;}// 说明当前是CLOSE状态需要判断是否触发阈值ListSimpleErrorCounter counters stat.values();long errCount 0;long totalCount 0;// 累加计算 异常请求数量、总请求数量for (SimpleErrorCounter counter : counters) {errCount counter.errorCount.sum();totalCount counter.totalCount.sum();}// 如果总请求数量未达到阈值什么都不做if (totalCount minRequestAmount) {return;}double curCount errCount;if (strategy DEGRADE_GRADE_EXCEPTION_RATIO) {// 计算请求的异常比例curCount errCount * 1.0d / totalCount;}// 如果比例超过阈值切换到 OPENif (curCount threshold) {transformToOpen(curCount);} }
http://www.zqtcl.cn/news/504243/

相关文章:

  • 推广网站怎么建设和维护strange wordpress主题
  • 安徽省建设厅网站打不开湘潭做网站找磐石网络一流
  • 沈阳做网站哪好网站建设后续说明
  • 给个网站最新的2021在网站的标题上怎么做图标
  • h5做网站用什么框架seo推广计划
  • 亿企搜网站建设百度网盘怎么领取免费空间
  • 天津网站排名提升如何用h5做网站
  • 外贸公司有必要建设网站吗赣州做网站哪家好
  • 功能型网站设计深圳网站优化效果
  • 郑州定制网站开发规模以上工业企业总产值
  • 锡林浩特市长安网站 建设初步方案廊坊百度推广排名优化
  • 搭建论坛网站的流程企业网络推广软件
  • 中国化工建设网站家居装修设计
  • 铜陵公司做网站大淘客网站建设app
  • 网站面包屑导航织梦做网站的教程
  • 建湖网站建设价格小程序商城哪个平台好
  • 网站域名 被别人备案买房的人都哭了吧
  • 自己做网站 套模板工具磨床东莞网站建设
  • 怎么上传图片到公司网站在深圳注册公司需要什么资料
  • 网站建设的公司哪家好用一段话来解释网站建设
  • 没有文字的网站怎么优化wordpress自定义文章类型模板
  • 东营网站设计制作网站建设匠人匠心科技
  • 海外如何淘宝网站建设2022新闻大事件摘抄
  • 仿win8 网站淘宝客网站开发视频教程
  • 宣威做网站建设的公司哈尔滨网站建设公司名字
  • 学网页设计在哪学关键词优化公司前十排名
  • 菏泽定制网站建设推广无固定ip 建设网站
  • wordpress网站制作教程视频百度云域名购买
  • 软件最全网站株洲网站排名优化价格
  • 购物便宜的网站有哪些家居企业网站建设讯息