建立网站的服务器,沈阳网页设计,龙华网站建设洛阳 网站建设,如何注册网站主办者文章目录 核心概念消息代理事件和消息了解事件异步消息通信响应式系统 事件驱动的利弊消息传递模式发布—订阅工作队列过滤器数据持久性 消息传递代理协议、标准和工具AMQP和RabbitMQ基本概念交换类型和路由消息确认和拒绝 设置RabbitMQ安装RabbitMQRabbitMQ管理界面 Spring AM… 文章目录 核心概念消息代理事件和消息了解事件异步消息通信响应式系统 事件驱动的利弊消息传递模式发布—订阅工作队列过滤器数据持久性 消息传递代理协议、标准和工具AMQP和RabbitMQ基本概念交换类型和路由消息确认和拒绝 设置RabbitMQ安装RabbitMQRabbitMQ管理界面 Spring AMQP和Spring Boot解决方案设计计划添加AMQP支持在Multiplication微服务中发布事件在Gamification微服务中订阅事件 场景分析事件流Gamification微服务不可用消息代理不可用事务性扩展微服务 小结 相关代码可以从这里下载
示例 前面的文章 1、1 一个测试驱动的Spring Boot应用程序开发 2、2 使用React构造前端应用 3、3 试驱动的Spring Boot应用程序开发数据层示例 4、4 向微服务架构转变
后面的文章 6、6 使用网关模式进行负载均衡
经过前面的示例了解了微服务之间的接口的紧耦合关系微服务Multiplication调用微服务Gamification变成了流程的协调器。如果还有其他服务也需要查询每次尝试的数据需要额外增加Multiplication对这些服务的调用从而创建一个具有中央处理的分布式整体。这很显然会带来更多的问题。 现在以Publish-Subscribe模式重新设计这些接口这种方式称为事件驱动的架构Event-driven Architecture。发布者Publisher不将数据传递到特定的目的地而是不知道订阅者Subscriber即系统中接收数据的一方的情况下对事件进行分类和发送。这些事件使用者同样不需要知道发布者的逻辑。这种范式的变化使系统的耦合变得松散且可扩展同时带来新的挑战。
核心概念
消息代理
事件驱动架构的要素是消息代理系统组件与消息代理通信而不是彼此直接连接通过这种方式来保持彼此间的松散耦合。 消息代理通常包含理由功能可以创建多个“通道Channel”能根据需求划分消息。一个或多个发布者可以在每个通道中生成消息这些消息可以被一个或多个订阅者甚至没有订阅者使用。下面是消息代理使用场景的概念视图。 #mermaid-svg-AJT2kgdHChopkdiC {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-AJT2kgdHChopkdiC .error-icon{fill:#552222;}#mermaid-svg-AJT2kgdHChopkdiC .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-AJT2kgdHChopkdiC .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-AJT2kgdHChopkdiC .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-AJT2kgdHChopkdiC .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-AJT2kgdHChopkdiC .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-AJT2kgdHChopkdiC .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-AJT2kgdHChopkdiC .marker{fill:#333333;stroke:#333333;}#mermaid-svg-AJT2kgdHChopkdiC .marker.cross{stroke:#333333;}#mermaid-svg-AJT2kgdHChopkdiC svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-AJT2kgdHChopkdiC .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-AJT2kgdHChopkdiC .cluster-label text{fill:#333;}#mermaid-svg-AJT2kgdHChopkdiC .cluster-label span{color:#333;}#mermaid-svg-AJT2kgdHChopkdiC .label text,#mermaid-svg-AJT2kgdHChopkdiC span{fill:#333;color:#333;}#mermaid-svg-AJT2kgdHChopkdiC .node rect,#mermaid-svg-AJT2kgdHChopkdiC .node circle,#mermaid-svg-AJT2kgdHChopkdiC .node ellipse,#mermaid-svg-AJT2kgdHChopkdiC .node polygon,#mermaid-svg-AJT2kgdHChopkdiC .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-AJT2kgdHChopkdiC .node .label{text-align:center;}#mermaid-svg-AJT2kgdHChopkdiC .node.clickable{cursor:pointer;}#mermaid-svg-AJT2kgdHChopkdiC .arrowheadPath{fill:#333333;}#mermaid-svg-AJT2kgdHChopkdiC .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-AJT2kgdHChopkdiC .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-AJT2kgdHChopkdiC .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-AJT2kgdHChopkdiC .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-AJT2kgdHChopkdiC .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-AJT2kgdHChopkdiC .cluster text{fill:#333;}#mermaid-svg-AJT2kgdHChopkdiC .cluster span{color:#333;}#mermaid-svg-AJT2kgdHChopkdiC div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-AJT2kgdHChopkdiC :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 消息代理 通道A 通道B 通道C 发布者1 发布者2 发布者3 订阅者1 订阅者2 订阅者3 这些概念不是新概念经验丰富的开发人员肯定会在企业服务总线ESB架构中发现类似的模式。总线模式促进了系统不同部分之间的通信提供了数据转换、映射、消息队列、排序、理由等功能。 企业服务总线Enterprise Service BusESB的概念是从面向服务架构(Service Oriented Architecture SOA)发展而来。ESB——企业服务总线像一根管道用来连接各个节点。为了集成不同系统不同协议的服务ESB做了消息的转换、解释与路由等工作让不同的服务互联互通。 ESB是一种在松散耦合的服务和应用之间标准的集成方式。它可以作用于
面向服务的架构 - 分布式的应用由可重用的服务组成面向消息的架构 - 应用之间通过ESB发送和接受消息事件驱动的架构 - 应用之间异步地产生和接收消息
ESB就是在SOA架构中实现服务间智能化集成与管理的中介。
ESB架构和基于消息代理的架构之间的区别还有一些争议。ESB中通道本身在系统中具有更重要的意义。服务总线为通信设置协议标准、转换数据并将数据理由到特定目标。有些实现可以处理分布式事务。某些情况下甚至有一个复杂的UI对业务流程进行建模并将这些规则转换为配置和代码。ESB架构倾向于将绝大部分系统业务逻辑集中在总线内构成系统的业务流程层。如图所示 #mermaid-svg-z6WkWOhxs2AdsmzQ {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-z6WkWOhxs2AdsmzQ .error-icon{fill:#552222;}#mermaid-svg-z6WkWOhxs2AdsmzQ .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-z6WkWOhxs2AdsmzQ .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-z6WkWOhxs2AdsmzQ .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-z6WkWOhxs2AdsmzQ .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-z6WkWOhxs2AdsmzQ .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-z6WkWOhxs2AdsmzQ .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-z6WkWOhxs2AdsmzQ .marker{fill:#333333;stroke:#333333;}#mermaid-svg-z6WkWOhxs2AdsmzQ .marker.cross{stroke:#333333;}#mermaid-svg-z6WkWOhxs2AdsmzQ svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-z6WkWOhxs2AdsmzQ .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-z6WkWOhxs2AdsmzQ .cluster-label text{fill:#333;}#mermaid-svg-z6WkWOhxs2AdsmzQ .cluster-label span{color:#333;}#mermaid-svg-z6WkWOhxs2AdsmzQ .label text,#mermaid-svg-z6WkWOhxs2AdsmzQ span{fill:#333;color:#333;}#mermaid-svg-z6WkWOhxs2AdsmzQ .node rect,#mermaid-svg-z6WkWOhxs2AdsmzQ .node circle,#mermaid-svg-z6WkWOhxs2AdsmzQ .node ellipse,#mermaid-svg-z6WkWOhxs2AdsmzQ .node polygon,#mermaid-svg-z6WkWOhxs2AdsmzQ .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-z6WkWOhxs2AdsmzQ .node .label{text-align:center;}#mermaid-svg-z6WkWOhxs2AdsmzQ .node.clickable{cursor:pointer;}#mermaid-svg-z6WkWOhxs2AdsmzQ .arrowheadPath{fill:#333333;}#mermaid-svg-z6WkWOhxs2AdsmzQ .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-z6WkWOhxs2AdsmzQ .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-z6WkWOhxs2AdsmzQ .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-z6WkWOhxs2AdsmzQ .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-z6WkWOhxs2AdsmzQ .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-z6WkWOhxs2AdsmzQ .cluster text{fill:#333;}#mermaid-svg-z6WkWOhxs2AdsmzQ .cluster span{color:#333;}#mermaid-svg-z6WkWOhxs2AdsmzQ div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-z6WkWOhxs2AdsmzQ :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 企业服务总线 智能路由 消息转换 中介 编排 拦截器 安全 服务1 服务2 服务3 服务4 将所有业务逻辑放到同一组件中且系统中有中央协调器的软件架构往往是容易失败的软件架构模式。采用这种路线的系统只有一个失败点因为整个系统都依赖核心部分总线随着时间的推移会变得很难维护和扩展。嵌入总线的逻辑往往会变得一团糟。这也是ESB架构遭人诟病的原因之一。 基于这样的原因逐渐放弃了这种集中协调的、过于智能的消息传送通道更倾向于使用消息代理实现一种更简单的方式只用它与其他不同组件进行通信。 可以将ESB视为复杂通道而将消息代理视为简单通道想要划清界限并不容易。一方面可以使用ESB平台但要适当隔离业务逻辑另一方面某些新的消息传递平台如Kafka提供了一些工具可以在通道中嵌入一些逻辑。如果必要也可以使用包含业务逻辑的函数来转换消息。也可以像使用数据库一样在通道中查询数据并根据需要处理输出。因此可在不同架构ESB/消息代理的相关工具之间进行切换用类似的方式使用。也就是要了解模式然后选择符合需求的工具。 建议尽量避免在通信通道中包含业务逻辑把业务流程保留在每个微服务中。 事件和消息
在事件驱动架构中一个事件表明在系统中发生了某些事情。业务逻辑拥有发生这些事件的领域可以将事件发布到消息通道如消息代理。构架中的其他组件如果到给定事件类型感兴趣则订阅该通道以消费所有后续事件实例。事件与发布-订阅模式有关也与消息代理或总线关联可以使用消息代理实现事件驱动架构下面就来看看。 消息是一个更通用的术语。有人将消息视为直接指向系统组件的元素将事件视为反映了给定领域中发生的事实的信息片段而且没有专门指向某个系统组件以此将两者区分开来。从技术角度看通过消息代理发送事件时事件实际上是一条消息。因此在设计事件驱动架构时就用“事件”来指代消息而“消息”则指进入消息代理的通用消息。 这样可对事件进行建模使用REST API发送事件类似于前面示例中的操作。但是生产者需要发现消费者才能将事件发送给它们这对降低耦合毫无帮助。 将事件与消息代理一起使用可以隔离软件架构中的所有组件。发布者和订阅者不需要知道彼此存在这非常适合微服务架构。使用这种策略可引入新的微服务它们来消费通道中的事件不需要修改发布这些事件的微服务也不需要修改其他订阅者。
了解事件
引入消息代理和一些Event类并不能直接变成“事件驱动架构”。必须在设计软件时考虑到事件需要付出努力。
第一个场景
假设创建了一个Gamification API给用户分配分数和徽章然后微服务Multiplication调用接口updateScore不仅发现了这个微服务还成为这部分业务逻辑通过为成功的尝试分配分数的所有者。这是刚使用微服务架构时常犯的错误源自命令式编程风格倾向于在微服务之间用API调用替换方法调用实现远程过程调用RPC模式有时甚至不会注意到这一点。如图所示 #mermaid-svg-m5zX6iJTGrgd7anm {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-m5zX6iJTGrgd7anm .error-icon{fill:#552222;}#mermaid-svg-m5zX6iJTGrgd7anm .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-m5zX6iJTGrgd7anm .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-m5zX6iJTGrgd7anm .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-m5zX6iJTGrgd7anm .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-m5zX6iJTGrgd7anm .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-m5zX6iJTGrgd7anm .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-m5zX6iJTGrgd7anm .marker{fill:#333333;stroke:#333333;}#mermaid-svg-m5zX6iJTGrgd7anm .marker.cross{stroke:#333333;}#mermaid-svg-m5zX6iJTGrgd7anm svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-m5zX6iJTGrgd7anm .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-m5zX6iJTGrgd7anm .cluster-label text{fill:#333;}#mermaid-svg-m5zX6iJTGrgd7anm .cluster-label span{color:#333;}#mermaid-svg-m5zX6iJTGrgd7anm .label text,#mermaid-svg-m5zX6iJTGrgd7anm span{fill:#333;color:#333;}#mermaid-svg-m5zX6iJTGrgd7anm .node rect,#mermaid-svg-m5zX6iJTGrgd7anm .node circle,#mermaid-svg-m5zX6iJTGrgd7anm .node ellipse,#mermaid-svg-m5zX6iJTGrgd7anm .node polygon,#mermaid-svg-m5zX6iJTGrgd7anm .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-m5zX6iJTGrgd7anm .node .label{text-align:center;}#mermaid-svg-m5zX6iJTGrgd7anm .node.clickable{cursor:pointer;}#mermaid-svg-m5zX6iJTGrgd7anm .arrowheadPath{fill:#333333;}#mermaid-svg-m5zX6iJTGrgd7anm .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-m5zX6iJTGrgd7anm .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-m5zX6iJTGrgd7anm .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-m5zX6iJTGrgd7anm .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-m5zX6iJTGrgd7anm .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-m5zX6iJTGrgd7anm .cluster text{fill:#333;}#mermaid-svg-m5zX6iJTGrgd7anm .cluster span{color:#333;}#mermaid-svg-m5zX6iJTGrgd7anm div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-m5zX6iJTGrgd7anm :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 发送尝试 更新分数 Multiplication微服务 如果挑战正确则更新分数 Gamification微服务 接口API更新分数 浏览器 #mermaid-svg-IgCErLLFJsyrSi6q {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-IgCErLLFJsyrSi6q .error-icon{fill:#552222;}#mermaid-svg-IgCErLLFJsyrSi6q .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-IgCErLLFJsyrSi6q .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-IgCErLLFJsyrSi6q .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-IgCErLLFJsyrSi6q .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-IgCErLLFJsyrSi6q .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-IgCErLLFJsyrSi6q .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-IgCErLLFJsyrSi6q .marker{fill:#333333;stroke:#333333;}#mermaid-svg-IgCErLLFJsyrSi6q .marker.cross{stroke:#333333;}#mermaid-svg-IgCErLLFJsyrSi6q svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-IgCErLLFJsyrSi6q .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-IgCErLLFJsyrSi6q .cluster-label text{fill:#333;}#mermaid-svg-IgCErLLFJsyrSi6q .cluster-label span{color:#333;}#mermaid-svg-IgCErLLFJsyrSi6q .label text,#mermaid-svg-IgCErLLFJsyrSi6q span{fill:#333;color:#333;}#mermaid-svg-IgCErLLFJsyrSi6q .node rect,#mermaid-svg-IgCErLLFJsyrSi6q .node circle,#mermaid-svg-IgCErLLFJsyrSi6q .node ellipse,#mermaid-svg-IgCErLLFJsyrSi6q .node polygon,#mermaid-svg-IgCErLLFJsyrSi6q .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-IgCErLLFJsyrSi6q .node .label{text-align:center;}#mermaid-svg-IgCErLLFJsyrSi6q .node.clickable{cursor:pointer;}#mermaid-svg-IgCErLLFJsyrSi6q .arrowheadPath{fill:#333333;}#mermaid-svg-IgCErLLFJsyrSi6q .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-IgCErLLFJsyrSi6q .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-IgCErLLFJsyrSi6q .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-IgCErLLFJsyrSi6q .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-IgCErLLFJsyrSi6q .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-IgCErLLFJsyrSi6q .cluster text{fill:#333;}#mermaid-svg-IgCErLLFJsyrSi6q .cluster span{color:#333;}#mermaid-svg-IgCErLLFJsyrSi6q div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-IgCErLLFJsyrSi6q :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 更新分数toGamification 消息代理 Multiplication微服务 如果挑战正确则更新分数 Gamification微服务 接口API更新分数 浏览器 为降低微服务之间的耦合可以引入消息代理将REST API调用替换成一条指向微服务Gamification的消息即UpdateScore消息。但能否做到不改变系统呢不能。因为消息仍然有一个目的地它不能被任何新的微服务调用。此外系统的两个部分仍然是紧耦合的并且产生了一个副作用用异步接口替换了同步接口增加了额外的复杂性。
第二个场景
基于当前的实现如图所示 #mermaid-svg-ZcaQQXwWYQMfBYlo {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-ZcaQQXwWYQMfBYlo .error-icon{fill:#552222;}#mermaid-svg-ZcaQQXwWYQMfBYlo .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-ZcaQQXwWYQMfBYlo .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-ZcaQQXwWYQMfBYlo .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-ZcaQQXwWYQMfBYlo .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-ZcaQQXwWYQMfBYlo .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-ZcaQQXwWYQMfBYlo .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-ZcaQQXwWYQMfBYlo .marker{fill:#333333;stroke:#333333;}#mermaid-svg-ZcaQQXwWYQMfBYlo .marker.cross{stroke:#333333;}#mermaid-svg-ZcaQQXwWYQMfBYlo svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-ZcaQQXwWYQMfBYlo .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-ZcaQQXwWYQMfBYlo .cluster-label text{fill:#333;}#mermaid-svg-ZcaQQXwWYQMfBYlo .cluster-label span{color:#333;}#mermaid-svg-ZcaQQXwWYQMfBYlo .label text,#mermaid-svg-ZcaQQXwWYQMfBYlo span{fill:#333;color:#333;}#mermaid-svg-ZcaQQXwWYQMfBYlo .node rect,#mermaid-svg-ZcaQQXwWYQMfBYlo .node circle,#mermaid-svg-ZcaQQXwWYQMfBYlo .node ellipse,#mermaid-svg-ZcaQQXwWYQMfBYlo .node polygon,#mermaid-svg-ZcaQQXwWYQMfBYlo .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-ZcaQQXwWYQMfBYlo .node .label{text-align:center;}#mermaid-svg-ZcaQQXwWYQMfBYlo .node.clickable{cursor:pointer;}#mermaid-svg-ZcaQQXwWYQMfBYlo .arrowheadPath{fill:#333333;}#mermaid-svg-ZcaQQXwWYQMfBYlo .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-ZcaQQXwWYQMfBYlo .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-ZcaQQXwWYQMfBYlo .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-ZcaQQXwWYQMfBYlo .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-ZcaQQXwWYQMfBYlo .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-ZcaQQXwWYQMfBYlo .cluster text{fill:#333;}#mermaid-svg-ZcaQQXwWYQMfBYlo .cluster span{color:#333;}#mermaid-svg-ZcaQQXwWYQMfBYlo div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-ZcaQQXwWYQMfBYlo :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 发送尝试 发送成功的挑战 Multiplication微服务 挑战解决后将尝试发送到Gamification Gamification微服务 接口API接收尝试并计算分数 浏览器 #mermaid-svg-fznHnVpEbzLwUodE {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-fznHnVpEbzLwUodE .error-icon{fill:#552222;}#mermaid-svg-fznHnVpEbzLwUodE .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-fznHnVpEbzLwUodE .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-fznHnVpEbzLwUodE .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-fznHnVpEbzLwUodE .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-fznHnVpEbzLwUodE .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-fznHnVpEbzLwUodE .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-fznHnVpEbzLwUodE .marker{fill:#333333;stroke:#333333;}#mermaid-svg-fznHnVpEbzLwUodE .marker.cross{stroke:#333333;}#mermaid-svg-fznHnVpEbzLwUodE svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-fznHnVpEbzLwUodE .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-fznHnVpEbzLwUodE .cluster-label text{fill:#333;}#mermaid-svg-fznHnVpEbzLwUodE .cluster-label span{color:#333;}#mermaid-svg-fznHnVpEbzLwUodE .label text,#mermaid-svg-fznHnVpEbzLwUodE span{fill:#333;color:#333;}#mermaid-svg-fznHnVpEbzLwUodE .node rect,#mermaid-svg-fznHnVpEbzLwUodE .node circle,#mermaid-svg-fznHnVpEbzLwUodE .node ellipse,#mermaid-svg-fznHnVpEbzLwUodE .node polygon,#mermaid-svg-fznHnVpEbzLwUodE .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-fznHnVpEbzLwUodE .node .label{text-align:center;}#mermaid-svg-fznHnVpEbzLwUodE .node.clickable{cursor:pointer;}#mermaid-svg-fznHnVpEbzLwUodE .arrowheadPath{fill:#333333;}#mermaid-svg-fznHnVpEbzLwUodE .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-fznHnVpEbzLwUodE .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-fznHnVpEbzLwUodE .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-fznHnVpEbzLwUodE .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-fznHnVpEbzLwUodE .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-fznHnVpEbzLwUodE .cluster text{fill:#333;}#mermaid-svg-fznHnVpEbzLwUodE .cluster span{color:#333;}#mermaid-svg-fznHnVpEbzLwUodE div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-fznHnVpEbzLwUodE :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} ChallengeSolvedEvent给任意感兴趣的订阅者 消息代理 Multiplication微服务 将尝试发送到通用通道 Gamification微服务 接口API接收尝试并计算分数 浏览器 将一个ChallengeSolvedDTO对象从Multiplication传递到Gamification维护了域边界。不再在Multiplication服务中包含Gamification的逻辑但是仍然需要明确配置Gamification的地址连接紧耦合依然存在。 通过引入消息代理可以解决这个问题。微服务Multiplication可将ChallengeSolvedDTO发布到通用的通道然后继续执行接下来的业务逻辑。第二个微服务可以订阅这个通道并处理消息从概念上讲这是一个事件以计算相应的分数和徽章。如果新加入系统中的微服务也对ChallengeSolvedDTO消息感兴趣例如生成报告或向用户发送通知也可以透明地订阅该通道。
第一个场景实现了一种命令模式其中微服务Multiplication对微服务Gamification执行的操作进行了指引也称为编制。第二个场景通过发送关于已发生的事件的通知以及上下文数据实现了事件模式消费者将处理此数据这可能触发业务逻辑也可能触发其他事件这种方法称为编排与编制相反。将软件架构建立在事件驱动设计的基础上时称其为事件驱动架构。
要实现真正事件驱动架构必须重新思考哪些可能以命令式表达的业务流程重新定义动作和事件不仅仅使用DDD来定义域还应该将它们之间的交互作为事件。 不需要为了遵循事件驱动架构而更改系统中的每个通信接口。在某些事件驱动模式不适用的情况下可能相应命令和请求/响应模式。不要试图将只适合命令模式的业务需求包装成事件。 异步消息通信
引入消息代理作为构建事件驱动架构的工具也就接纳了异步消息传递。发布者发送事件不需要等待任何事件消费者的回复将使架构保持松耦合并具有可扩展性。如图所示 #mermaid-svg-VvDN767Rz5MJjJ3H {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-VvDN767Rz5MJjJ3H .error-icon{fill:#552222;}#mermaid-svg-VvDN767Rz5MJjJ3H .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-VvDN767Rz5MJjJ3H .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-VvDN767Rz5MJjJ3H .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-VvDN767Rz5MJjJ3H .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-VvDN767Rz5MJjJ3H .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-VvDN767Rz5MJjJ3H .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-VvDN767Rz5MJjJ3H .marker{fill:#333333;stroke:#333333;}#mermaid-svg-VvDN767Rz5MJjJ3H .marker.cross{stroke:#333333;}#mermaid-svg-VvDN767Rz5MJjJ3H svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-VvDN767Rz5MJjJ3H .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-VvDN767Rz5MJjJ3H .cluster-label text{fill:#333;}#mermaid-svg-VvDN767Rz5MJjJ3H .cluster-label span{color:#333;}#mermaid-svg-VvDN767Rz5MJjJ3H .label text,#mermaid-svg-VvDN767Rz5MJjJ3H span{fill:#333;color:#333;}#mermaid-svg-VvDN767Rz5MJjJ3H .node rect,#mermaid-svg-VvDN767Rz5MJjJ3H .node circle,#mermaid-svg-VvDN767Rz5MJjJ3H .node ellipse,#mermaid-svg-VvDN767Rz5MJjJ3H .node polygon,#mermaid-svg-VvDN767Rz5MJjJ3H .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-VvDN767Rz5MJjJ3H .node .label{text-align:center;}#mermaid-svg-VvDN767Rz5MJjJ3H .node.clickable{cursor:pointer;}#mermaid-svg-VvDN767Rz5MJjJ3H .arrowheadPath{fill:#333333;}#mermaid-svg-VvDN767Rz5MJjJ3H .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-VvDN767Rz5MJjJ3H .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-VvDN767Rz5MJjJ3H .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-VvDN767Rz5MJjJ3H .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-VvDN767Rz5MJjJ3H .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-VvDN767Rz5MJjJ3H .cluster text{fill:#333;}#mermaid-svg-VvDN767Rz5MJjJ3H .cluster span{color:#333;}#mermaid-svg-VvDN767Rz5MJjJ3H div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-VvDN767Rz5MJjJ3H :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} ChallengeSolvedEvent给任意感兴趣的订阅者 1、发送尝试 2 4更新分数 3、响应 尝试 浏览器 Multiplication微服务 Gamification微服务 当然也可以使用消息代理来保持进程的同步。前面的例子中计划用消息代理替换REST API接口创建两个通道来传递事件而不是只创建一个通道使用第二个通道接收来自微服务Gamification的响应。可以阻塞请求的线程并在继续处理之前等待确认。如图所示 #mermaid-svg-n7rUaGSjBJKUOepP {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-n7rUaGSjBJKUOepP .error-icon{fill:#552222;}#mermaid-svg-n7rUaGSjBJKUOepP .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-n7rUaGSjBJKUOepP .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-n7rUaGSjBJKUOepP .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-n7rUaGSjBJKUOepP .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-n7rUaGSjBJKUOepP .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-n7rUaGSjBJKUOepP .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-n7rUaGSjBJKUOepP .marker{fill:#333333;stroke:#333333;}#mermaid-svg-n7rUaGSjBJKUOepP .marker.cross{stroke:#333333;}#mermaid-svg-n7rUaGSjBJKUOepP svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-n7rUaGSjBJKUOepP .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-n7rUaGSjBJKUOepP .cluster-label text{fill:#333;}#mermaid-svg-n7rUaGSjBJKUOepP .cluster-label span{color:#333;}#mermaid-svg-n7rUaGSjBJKUOepP .label text,#mermaid-svg-n7rUaGSjBJKUOepP span{fill:#333;color:#333;}#mermaid-svg-n7rUaGSjBJKUOepP .node rect,#mermaid-svg-n7rUaGSjBJKUOepP .node circle,#mermaid-svg-n7rUaGSjBJKUOepP .node ellipse,#mermaid-svg-n7rUaGSjBJKUOepP .node polygon,#mermaid-svg-n7rUaGSjBJKUOepP .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-n7rUaGSjBJKUOepP .node .label{text-align:center;}#mermaid-svg-n7rUaGSjBJKUOepP .node.clickable{cursor:pointer;}#mermaid-svg-n7rUaGSjBJKUOepP .arrowheadPath{fill:#333333;}#mermaid-svg-n7rUaGSjBJKUOepP .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-n7rUaGSjBJKUOepP .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-n7rUaGSjBJKUOepP .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-n7rUaGSjBJKUOepP .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-n7rUaGSjBJKUOepP .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-n7rUaGSjBJKUOepP .cluster text{fill:#333;}#mermaid-svg-n7rUaGSjBJKUOepP .cluster span{color:#333;}#mermaid-svg-n7rUaGSjBJKUOepP div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-n7rUaGSjBJKUOepP :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 3ChallengeSolvedEventProcessedByGamification给任意感兴趣的订阅者 2ChallengeSolvedEvent给任意感兴趣的订阅者 1、发送尝试 4更新分数 4、响应 尝试—响应 尝试 浏览器 Multiplication微服务 Gamification微服务 这实际上是基于消息代理的请求/响应模式这种组合在某些情况下可能很有用但不建议在事件驱动方法中使用。主要原因是微服务Multiplication需要知道订阅者及订阅者的数量以确保收到所有响应系统组件会因此而紧耦合。但可从中得到好处例如可伸缩性也可以运用其他模式来提高可伸缩性如负载均衡器。在进程要求必须同步的情况下可以考虑使用一种更简单的同步接口例如REST API。下表给出了一种建议请注意具体如何使用要具体分析。
模式类型实现方式请求/响应同步REST API需要阻塞的命令同步REST API不需要阻塞的命令异步消息代理事件异步消息代理
需要注意的是尽管端到端采用了异步通信但应用程序与消息代理之间用的是同步接口。当发布消息时确保代理在继续其他操作之前已收到消息。这同样适用于订阅者在订阅者消费了消息后代理需要一个确保已将该消息标记为已处理然后移到下一个消息。这两个步骤对保证数据的安全和系统的可靠性至关重要。
响应式系统
响应式系统是将其描述为一套应用于软件架构的设计原则从而使系统响应敏捷及时响应、具有弹性在出现故障时保持响应、可灵活伸缩适应在不同工作负载下响应和基于消息驱动确保松散耦合和边界隔离。如果遵循遵循范式构建系统可以称为响应式系统。 另外响应式编程指的是一组在编程语言中使用的技术围绕如下范式如futures或promise、reactive streams、backpressure等。Java中有一些流行的库如Reactor或RxJava可以实现这些范式。使用响应式编程可将逻辑切分成一组较小的块这些逻辑块可以异步运行然后合成或转换结果。这也带来了并发性的改进在并行执行任务时可以更快完成任务。 使用响应式编程并不会让架构编程响应式架构。它们在不同的层面工作响应式编程有利于在组件内和并发性方面做出改进响应式系统是这些组件之间在更高层面上的变化有助于构建松耦合、富有弹性和可伸缩性的系统。
事件驱动的利弊
前面的微服务架构获得了灵活性和可伸缩性但也面临挑战如最终一致性、容错性和部分更新等。使用消息代理模式进行事件驱动有助于应对这些挑战。
微服务之间的松耦合已经找到了让微服务Multiplication不需要知道Gamification服务的方法。Multiplication给代理发送一个事件Gamification向代理订阅事件对其做出反应为用户更新分数和徽章。可伸缩性为系统进行水平扩展会很方便。此外在架构中引入新的微服务也很容易可以订阅事件并独立工作例如可以根据现有服务触发的事件生成报告或发送电子邮件等。容错性和最终一致性如果消息代理足够可靠那么即使系统组件出现了故障也可以保证最终一致性。因为代理可以持久化消息所以假设微服务Gamification宕机了一段时间在其重新上线后可以通过事件进行数据补偿。
这里使用异步的过程来简单地通知其他系统组件避免了创建阻塞的、命令式的流程这需要一种不同的思维方式要接受一种观念即数据的状态可能不会在所有微服务中保持一致。 此外随着消息代理的引入需要添加一个新组件因为不能断定消息代理一定不会出错就必须让系统准备好以应对潜在的错误。
丢弃的消息可能是ChallengeSolvedEvent永远无法传递到Gamification服务等情况。如果构建一个不错过任何一个事件的系统就应该配置消息代理以实现至少一次at-least-once的保证策略以确保消息代理至少传递消息一次尽管它们是可以重复的。重复消息某些情况下消息代理可能会传递多次仅被发布过一次的消息。在示例中如果重复收到该事件将会错误地增加分数。因此必须考虑将事件的消费幂等化如果多次调用且不会产生不同的结果就是幂等的。示例中可选的解决方案是标记已被Gamification端处理过的事件并忽略任何重复的事件。一些消息代理还可以提供最多一次的良好保证如RabbitMQ和Kafka有助于防止重复。无序消息消息代理会尽量避免无序消息但如果出现故障或存在漏洞这种情况仍会发生。必须做好处理的准备如果可能应该尽量避免按照与发布时间相同的顺序来消费事件。消息代理的宕机时间最坏的情况下代理可能不可用。发布者和订阅者都应该尝试处理这种情况例如重试策略或缓存。
事件驱动系统的另一个缺点是追溯变得更加困难。调用一个REST API可能触发事件然后可能会有一些组件对这些事件做出响应随后发布其他事件继续延长这个链条。当只有几个分布式进程时了解不同事件在不同的微服务中引起的动作可能问题不大但随着系统的发展要全面了解这些事件和操作链是一项巨大的挑战。需要这个视图对错误操作进行调试并找出触发指定进程的原因。有一些工具可以实现分布式跟踪将事件和动作链接起来并可视化为一系列动作/响应如Spring Cloud Sleuth可在日志中自动注入标识符的工具在发出/接收HTTP调用时通过RabbitMQ发布/消费消息以及其他时候将这些标识符一直传播下去然后如果使用集中式日志记录可使用这些标识符链接所有的进程。
消息传递模式
要根据需要运用消息传递平台的几种模式如图所示 #mermaid-svg-C1b7nlhuHLCGS3NT {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-C1b7nlhuHLCGS3NT .error-icon{fill:#552222;}#mermaid-svg-C1b7nlhuHLCGS3NT .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-C1b7nlhuHLCGS3NT .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-C1b7nlhuHLCGS3NT .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-C1b7nlhuHLCGS3NT .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-C1b7nlhuHLCGS3NT .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-C1b7nlhuHLCGS3NT .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-C1b7nlhuHLCGS3NT .marker{fill:#333333;stroke:#333333;}#mermaid-svg-C1b7nlhuHLCGS3NT .marker.cross{stroke:#333333;}#mermaid-svg-C1b7nlhuHLCGS3NT svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-C1b7nlhuHLCGS3NT .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-C1b7nlhuHLCGS3NT .cluster-label text{fill:#333;}#mermaid-svg-C1b7nlhuHLCGS3NT .cluster-label span{color:#333;}#mermaid-svg-C1b7nlhuHLCGS3NT .label text,#mermaid-svg-C1b7nlhuHLCGS3NT span{fill:#333;color:#333;}#mermaid-svg-C1b7nlhuHLCGS3NT .node rect,#mermaid-svg-C1b7nlhuHLCGS3NT .node circle,#mermaid-svg-C1b7nlhuHLCGS3NT .node ellipse,#mermaid-svg-C1b7nlhuHLCGS3NT .node polygon,#mermaid-svg-C1b7nlhuHLCGS3NT .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-C1b7nlhuHLCGS3NT .node .label{text-align:center;}#mermaid-svg-C1b7nlhuHLCGS3NT .node.clickable{cursor:pointer;}#mermaid-svg-C1b7nlhuHLCGS3NT .arrowheadPath{fill:#333333;}#mermaid-svg-C1b7nlhuHLCGS3NT .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-C1b7nlhuHLCGS3NT .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-C1b7nlhuHLCGS3NT .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-C1b7nlhuHLCGS3NT .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-C1b7nlhuHLCGS3NT .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-C1b7nlhuHLCGS3NT .cluster text{fill:#333;}#mermaid-svg-C1b7nlhuHLCGS3NT .cluster span{color:#333;}#mermaid-svg-C1b7nlhuHLCGS3NT div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-C1b7nlhuHLCGS3NT :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 订阅者1 订阅者2 通道 消费 9 8 7 6 5 4 3 2 1 消费 9 7 5 3 1 消费 8 6 4 2 副本1 副本2 数据库1 工作队列订阅者1需要两个实例来拆分负载。 副本1 数据库1 过滤订阅者2对消息5和6不感兴趣。 生成 9 8 7 6 5 4 3 2 1 发布—订阅不同的订阅者可以收到相同的消息。 发布者 发布—订阅
这种模式下不同的订阅者将接收相同消息的副本。例如系统中可能有多个对ChallengeSolvedEvent感兴趣的组件如微服务Gamification和微服务Reporting。这种情况下重要的是配置订阅者使其接收相同的消息。每个订阅者将以不同的目的处理事件以免引起重复操作。 这种模式更适合事件不适用于发送到特定服务的消息。 工作队列
这种模式也称为竞争消费者模式这种情况下可以将工作拆分到同一个应用程序的多个实例之间。 如上图所示同一个微服务有多个副本目的是平衡它们之间的工作负载每个实例将消费不同的消息、处理它们将结果存储在数据库中。 注意同一组件的多个副本应共享同一数据层便于安全地分割工作。 过滤器
一种常见的情况是一些订阅者对发布到一个通道的所有消息都感兴趣而其他一些订阅者则只对其中一部分感兴趣。上图中的订阅者2就是这种情况。最简单的选择就是基于应用程序的过滤逻辑在消息被消费后立即丢弃掉。 一些消息代理也提供现成的过滤功能系统组件可以使用给定的过滤器将自己注册为订阅者。 数据持久性
如果代理可以持久化消息订阅者就不需要一直保持运行来消费所有数据。每个订阅者在消息代理中都有一个相关联的标记以便知道最后消费的消息是什么。如果不能在指定的时间点获取消息稍后数据流会从离开的节点重新传递。 即使所有订阅者检索到特定消息后也可能想将其存储在消息代理中一段时间。如果新的订阅者想要获得其上线之前的消息这就很有用了。如果要对某个订阅者进行重置使所有消息重新得到处理则持久化指定时间段的所有消息会有所帮助。 在一个将所有操作建模为事件的系统中可以从中获益。想象一下误删除了现有数据库中的所有数据。理论上可以从头开始重播所有事件并重新建立相同的状态。因此根本不需要在数据库中保留指定实体的最后状态可将其视为多个事件的聚合这就是事件追溯的核心概念。 当订阅者的运算不幂等时可能很危险。 消息传递代理协议、标准和工具
多年来出现了一些与消息代理有关的消息传递协议和标准
高级消息队列协议AMQP一种有线协议将消息的数据格式定义为字节流。消息队列遥测传输MQTT一种协议已成为物联网设备的流行协议可用很少的代码实现可在有限的带宽下工作。某些文本流的消息传递协议STOMP一种类似HTTP的、基于文本的协议面向消息传递中间件。Java消息服务JMS一种API标准关注消息传递系统中应该实现的行为。可以找到不同的JMS客户端实现使用不同的底层协议连接到消息代理。
下面是实现了一些协议和标准或有自己的协议和标准的流行软件工具
RabbitMQ一个开源的消息代理实现支持AMQP、MQTT和STOMP等协议还提供了强大路由配置的JMS API客户端。Mosquito一个实现了MQTT协议的Eclipse消息代理是物联网系统的一个流行选项。Kafka最初由LinkedIn设计在TCP上使用自己的二进制协议。尽管Kafka的核心功能不提供与传统消息代理例如路由相同的功能当对消息中间件的要求很简单时会是一个强大的消息传递平台。常用于需要处理流中大量数据的应用程序中。
任何情况下如果需要在不同的工具中进行抉择就应该熟悉它们分析其功能是否能满足需求。在Java和Spring Boot中RabbitMQ和Kafka是构建事件驱动架构的常用工具。 这里使用RabbitMQ和AMQP协议原因是它们提供了多种配置可了解大多数选项也可在其他消息传递平台重用这些知识。
AMQP和RabbitMQ
基本概念
前面说过发布者是系统中向消息代理发布消息的组件或应用程序消费者或订阅者接收并处理这些消息。AMQP还定义了交换exchange、队列queue和绑定binding如图所示 #mermaid-svg-6o1H7LF6WQrUBfOo {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-6o1H7LF6WQrUBfOo .error-icon{fill:#552222;}#mermaid-svg-6o1H7LF6WQrUBfOo .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-6o1H7LF6WQrUBfOo .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-6o1H7LF6WQrUBfOo .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-6o1H7LF6WQrUBfOo .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-6o1H7LF6WQrUBfOo .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-6o1H7LF6WQrUBfOo .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-6o1H7LF6WQrUBfOo .marker{fill:#333333;stroke:#333333;}#mermaid-svg-6o1H7LF6WQrUBfOo .marker.cross{stroke:#333333;}#mermaid-svg-6o1H7LF6WQrUBfOo svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-6o1H7LF6WQrUBfOo .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-6o1H7LF6WQrUBfOo .cluster-label text{fill:#333;}#mermaid-svg-6o1H7LF6WQrUBfOo .cluster-label span{color:#333;}#mermaid-svg-6o1H7LF6WQrUBfOo .label text,#mermaid-svg-6o1H7LF6WQrUBfOo span{fill:#333;color:#333;}#mermaid-svg-6o1H7LF6WQrUBfOo .node rect,#mermaid-svg-6o1H7LF6WQrUBfOo .node circle,#mermaid-svg-6o1H7LF6WQrUBfOo .node ellipse,#mermaid-svg-6o1H7LF6WQrUBfOo .node polygon,#mermaid-svg-6o1H7LF6WQrUBfOo .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-6o1H7LF6WQrUBfOo .node .label{text-align:center;}#mermaid-svg-6o1H7LF6WQrUBfOo .node.clickable{cursor:pointer;}#mermaid-svg-6o1H7LF6WQrUBfOo .arrowheadPath{fill:#333333;}#mermaid-svg-6o1H7LF6WQrUBfOo .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-6o1H7LF6WQrUBfOo .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-6o1H7LF6WQrUBfOo .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-6o1H7LF6WQrUBfOo .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-6o1H7LF6WQrUBfOo .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-6o1H7LF6WQrUBfOo .cluster text{fill:#333;}#mermaid-svg-6o1H7LF6WQrUBfOo .cluster span{color:#333;}#mermaid-svg-6o1H7LF6WQrUBfOo div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-6o1H7LF6WQrUBfOo :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 消息代理 绑定键*.correct 绑定键*.wrong 交换 3 2 1 队列1 3 1 队列2 2 发布者 订阅者1 订阅者2 路由键1“attempts.correct”2“attempts:wrong” 交换是消息被发送到的实体根据交换类型和规则定义的逻辑称为绑定路由到队列。交换可以是持久化的即使消息代理重新启动仍然存在如果不存在也是暂时的。 队列是AMQP中存储将被消费的消息的对象。一个队列可能有0个、1个或多个消费者可以是持久或临时的要注意持久化的队列不意味着它的所有消息都是持久的。要使消息在消息代理重启后仍然存在必须将其作为持久消息发布。 绑定是将发布到交换的消息路由到特定队列的规则就是将队列绑定到找到的交换。一些交换支持通过一个可选的绑定键binding key来决定哪些发布到交换的消息最终应该到达指定的队列。从这个意义上可以将绑定看作过滤器。 发布者可以在发送消息时指定路由键routing key如果使用这些配置可以基于绑定键对其进行正确筛选。路由键由点分隔的单词组成例如attempts.correct绑定键也有类似的格式还可能包含模式匹配器具体取决于交换的类型。
交换类型和路由
RabbitMQ有几种交换类型可选如图所示通过绑定键定义的不同路由策略和每条消息对应的路由键进行组合。
默认交换default exchange由消息代理预先声明。所有已创建的队列都使用与队列同名的绑定键与该交换进行绑定。从概念角度看这意味着可考虑使用目标队列作为绑定键来发布消息如果将相应的名称用作路由键。从技术角度看这些消息仍然要通过交换。这种设置不常用因为它违背了整个路由的目的。直连交换direct exchange常用于单播路由。与默认交换的区别在于可以使用自己的绑定键也可以创建多个使用相同绑定键的队列然后这些队列都将获得其路由键与绑定键匹配的消息。从概念来看在发布知道目的地单播的消息时使用不需要知道有多少队列会获得消息。扇状交换fanout exchange不使用路由键。将所有消息路由到绑定到交换的所有队列非常适合广播场景。主题交换topic exchange是最灵活的。可以使用一个模式而不是使用具体的值将队列绑定到交换上允许订阅者注册队列来使用经过筛选的消息集。在模式中可以使用#表示匹配任何一组单词或使用*表示只匹配一个单词。头部交换headers exchange使用消息头部作为路由键以提高灵活性。因为可以设置一个或多个消息头部的匹配条件进行全匹配或任意匹配可忽略标准路由键。 #mermaid-svg-MuLM8daCOJpSC4NP {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-MuLM8daCOJpSC4NP .error-icon{fill:#552222;}#mermaid-svg-MuLM8daCOJpSC4NP .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-MuLM8daCOJpSC4NP .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-MuLM8daCOJpSC4NP .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-MuLM8daCOJpSC4NP .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-MuLM8daCOJpSC4NP .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-MuLM8daCOJpSC4NP .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-MuLM8daCOJpSC4NP .marker{fill:#333333;stroke:#333333;}#mermaid-svg-MuLM8daCOJpSC4NP .marker.cross{stroke:#333333;}#mermaid-svg-MuLM8daCOJpSC4NP svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-MuLM8daCOJpSC4NP .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-MuLM8daCOJpSC4NP .cluster-label text{fill:#333;}#mermaid-svg-MuLM8daCOJpSC4NP .cluster-label span{color:#333;}#mermaid-svg-MuLM8daCOJpSC4NP .label text,#mermaid-svg-MuLM8daCOJpSC4NP span{fill:#333;color:#333;}#mermaid-svg-MuLM8daCOJpSC4NP .node rect,#mermaid-svg-MuLM8daCOJpSC4NP .node circle,#mermaid-svg-MuLM8daCOJpSC4NP .node ellipse,#mermaid-svg-MuLM8daCOJpSC4NP .node polygon,#mermaid-svg-MuLM8daCOJpSC4NP .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-MuLM8daCOJpSC4NP .node .label{text-align:center;}#mermaid-svg-MuLM8daCOJpSC4NP .node.clickable{cursor:pointer;}#mermaid-svg-MuLM8daCOJpSC4NP .arrowheadPath{fill:#333333;}#mermaid-svg-MuLM8daCOJpSC4NP .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-MuLM8daCOJpSC4NP .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-MuLM8daCOJpSC4NP .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-MuLM8daCOJpSC4NP .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-MuLM8daCOJpSC4NP .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-MuLM8daCOJpSC4NP .cluster text{fill:#333;}#mermaid-svg-MuLM8daCOJpSC4NP .cluster span{color:#333;}#mermaid-svg-MuLM8daCOJpSC4NP div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-MuLM8daCOJpSC4NP :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 消息代理 q-gamification q-reports game game reports ignored ignored #.correct #.wrong “correcttrue,version1,x-matchall” “correcttrue,version1,x-matchany” 默认交换 3 2 1 q-gamification 3 1 q-reports 2 直连交换 3 2 1 q-gamification-1 3 1 q-gamification-2 3 1 q-reports 2 扇状交换 3 2 1 q-gamification 3 2 1 q-reports 3 2 1 主题交换 3 2 1 q-gamification 1 q-reports 3 2 1 头部交换 3 2 1 q-gamification 1 q-reports 3 1 发布者 路由键1“q-gamification”2“q-reports”3“q-gamification” 发布者 路由键1“game”2“reports”3“game” 发布者 没有考虑路由键 发布者 路由键1“challengesolved.correct”2“challengesolved.wrong”3“challengesolved.wrong” 发布者 头部1“correcttrue”,”version1“2“correcttrue”,”version2“3“correctfalse”,”version1“ Gamification订阅者 Reports订阅者 Gamification订阅者 Reports订阅者 Gamification订阅者 Reports订阅者 Gamification订阅者 Reports订阅者 Gamification订阅者 Reports订阅者 前面介绍的发布/订阅和过滤器模式都适用于这些场景。图中的直连交换可能看起来像是工作队列模式实际上并非如此。在AMQP协议中负载均衡发生在同一队列的消费者之间而不是队列之间。 要实现工作队列模式通常会多次订阅同一队列如图所示。 #mermaid-svg-fUA0SDlkM9y79mTv {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-fUA0SDlkM9y79mTv .error-icon{fill:#552222;}#mermaid-svg-fUA0SDlkM9y79mTv .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-fUA0SDlkM9y79mTv .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-fUA0SDlkM9y79mTv .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-fUA0SDlkM9y79mTv .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-fUA0SDlkM9y79mTv .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-fUA0SDlkM9y79mTv .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-fUA0SDlkM9y79mTv .marker{fill:#333333;stroke:#333333;}#mermaid-svg-fUA0SDlkM9y79mTv .marker.cross{stroke:#333333;}#mermaid-svg-fUA0SDlkM9y79mTv svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-fUA0SDlkM9y79mTv .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-fUA0SDlkM9y79mTv .cluster-label text{fill:#333;}#mermaid-svg-fUA0SDlkM9y79mTv .cluster-label span{color:#333;}#mermaid-svg-fUA0SDlkM9y79mTv .label text,#mermaid-svg-fUA0SDlkM9y79mTv span{fill:#333;color:#333;}#mermaid-svg-fUA0SDlkM9y79mTv .node rect,#mermaid-svg-fUA0SDlkM9y79mTv .node circle,#mermaid-svg-fUA0SDlkM9y79mTv .node ellipse,#mermaid-svg-fUA0SDlkM9y79mTv .node polygon,#mermaid-svg-fUA0SDlkM9y79mTv .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-fUA0SDlkM9y79mTv .node .label{text-align:center;}#mermaid-svg-fUA0SDlkM9y79mTv .node.clickable{cursor:pointer;}#mermaid-svg-fUA0SDlkM9y79mTv .arrowheadPath{fill:#333333;}#mermaid-svg-fUA0SDlkM9y79mTv .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-fUA0SDlkM9y79mTv .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-fUA0SDlkM9y79mTv .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-fUA0SDlkM9y79mTv .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-fUA0SDlkM9y79mTv .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-fUA0SDlkM9y79mTv .cluster text{fill:#333;}#mermaid-svg-fUA0SDlkM9y79mTv .cluster span{color:#333;}#mermaid-svg-fUA0SDlkM9y79mTv div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-fUA0SDlkM9y79mTv :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 消息代理 game reports 直连交换 5 4 3 2 1 q-gamification-1 5 4 3 2 1 q-reports 2 发布者 路由键1“game”2“reports”3“game”4“game”5“game” Gamification订阅者13 1 Gamification订阅者25 2 Reports订阅者2 消息确认和拒绝
AMQP协议为消费者应用程序定义了两种不同的确认模式。由于消费者发送确认后消息将从队列中删除因此理解这两种确认模式很重要。 第一个可选项是自动确认这种策略消息在发送到应用程序时将被视为已交付。第二个选项称为显式确认会等待应用程序发送确认信号。相比之下第二个选项更好可以确保所有消息都得到处理。 消费者可以读取信息、运行一些业务逻辑、将相关数据持久化甚至可以在向消息代理发送确认信号之前触发后续的事件。这种场景下只有当消息已被完全处理时才会将其从队列中删除。如果消费者在发送信号之前宕机或出现错误那么消息代理将尝试把消息传递给另一个消费者。如果没有错误将保持等待直到有一个消费者开用。 消费者也可以拒绝消息。假设某个消费者实例由于网络错误而无法访问数据库消费者可以拒绝该消息并指定该消息重新回到队列或将其丢弃。请注意如果导致拒绝消息的错误持续存在了一段时间且没有其他消费者能够成功地处理它最终可能陷入无限的“重入队列/拒绝消息”的循环之中。
设置RabbitMQ
安装RabbitMQ
可以到RabbitMQ的官网去下载RabbitMQ。安装之后启动代理即可这里不再一一介绍。 下面介绍一下Docker下安装RabbitMQ的方法。
拉取镜像
使用docker pull rabbitmq:management命令推荐使用带有management版本的。
创建数据卷
使用docker volume create rabbitmq-home命令创建一个数据卷专门用于持久化RabbitMQ的所有数据方便管理。
创建并运行容器
使用命令
docker run -id --namerabbitmq
-v rabbitmq-home:/var/lib/rabbitmq
-p 15672:15672 -p 5672:5672
-e RABBITMQ_DEFAULT_USERadmin
-e RABBITMQ_DEFAULT_PASS123456
rabbitmq:management这里除了挂载数据卷rabbitmq-home之外还暴露了两个端口以及设定了两个环境变量
15672端口RabbitMQ的管理页面端口。5672端口RabbitMQ的消息接收端口。RABBITMQ_DEFAULT_USER环境变量指定RabbitMQ的用户名这里指定为admin大家部署时替换成自己定义的。RABBITMQ_DEFAULT_PASS环境变量指定RabbitMQ的密码这里指定为123456大家部署时替换成自己定义的。
这样容器就部署完成了启动容器后在浏览器中访问服务器地址:15672即可访问到RabbitMQ的管理界面用户名和密码即为刚刚指定的环境变量的配置值。 RabbitMQ容器是通过指定环境变量的方式进行配置的这比修改配置文件便捷得多还有更多的配置用的环境变量大家可以参考官方文档。
RabbitMQ管理界面
在docker中安装带有management的版本即可管理RabbitMQ在浏览器中输入http://localhost:15672/可看到登录界面使用自己设置的用户名和密码登录即可进入管理界面如下所示 通过这个界面可监控队列中的消息、处理速率、不同注册节点的统计信息等还有其他特性如对队列和交换的监管甚至可以创建或删除实体。
Spring AMQP和Spring Boot
前面使用Spring Boot构建微服务也会使用Spring模块连接到RabbitMQ消息代理。这需要Spring AMQP它包含两个工件spring-rabbit是一组与RabbitMQ代理一起使用的工具spring-amqp包括所有AMQP抽象可使实现独立于供应商。当前Spring提供AMQP协议的RabbitMQ实现。 Spring Boot为AMQP提供了一个启动程序带有自动配置spring-boot-starter-amqp包含了上面的两个工件。
解决方案设计
下图对要构建的功能进行了说明从微服务Multiplication到客户端的响应可能在Gamification微服务处理消息之前发生这是一个异步的、最终一致的流程。这里将创建一个Topic类型的Attempts交换在类似的事件驱动架构中能灵活地使用特定的路由键发送事件允许消费者订阅所有事件或在队列中设置自己的过滤器。 #mermaid-svg-SyGpM50vdXqK6pcF {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SyGpM50vdXqK6pcF .error-icon{fill:#552222;}#mermaid-svg-SyGpM50vdXqK6pcF .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-SyGpM50vdXqK6pcF .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-SyGpM50vdXqK6pcF .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-SyGpM50vdXqK6pcF .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-SyGpM50vdXqK6pcF .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-SyGpM50vdXqK6pcF .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-SyGpM50vdXqK6pcF .marker{fill:#333333;stroke:#333333;}#mermaid-svg-SyGpM50vdXqK6pcF .marker.cross{stroke:#333333;}#mermaid-svg-SyGpM50vdXqK6pcF svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-SyGpM50vdXqK6pcF .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-SyGpM50vdXqK6pcF .cluster-label text{fill:#333;}#mermaid-svg-SyGpM50vdXqK6pcF .cluster-label span{color:#333;}#mermaid-svg-SyGpM50vdXqK6pcF .label text,#mermaid-svg-SyGpM50vdXqK6pcF span{fill:#333;color:#333;}#mermaid-svg-SyGpM50vdXqK6pcF .node rect,#mermaid-svg-SyGpM50vdXqK6pcF .node circle,#mermaid-svg-SyGpM50vdXqK6pcF .node ellipse,#mermaid-svg-SyGpM50vdXqK6pcF .node polygon,#mermaid-svg-SyGpM50vdXqK6pcF .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-SyGpM50vdXqK6pcF .node .label{text-align:center;}#mermaid-svg-SyGpM50vdXqK6pcF .node.clickable{cursor:pointer;}#mermaid-svg-SyGpM50vdXqK6pcF .arrowheadPath{fill:#333333;}#mermaid-svg-SyGpM50vdXqK6pcF .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-SyGpM50vdXqK6pcF .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-SyGpM50vdXqK6pcF .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-SyGpM50vdXqK6pcF .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-SyGpM50vdXqK6pcF .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-SyGpM50vdXqK6pcF .cluster text{fill:#333;}#mermaid-svg-SyGpM50vdXqK6pcF .cluster span{color:#333;}#mermaid-svg-SyGpM50vdXqK6pcF div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-SyGpM50vdXqK6pcF :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 2ChallengeSolvedEvent给任意感兴趣的订阅者 1、发送尝试 路由键- attempt.correct- attempt.wrong 绑定attempt.correct 4更新分数 3、响应 Attempts(主题交换) Gamification尝试 浏览器 Multiplication微服务 Gamification微服务 从概念上讲Multiplication微服务拥有Attempts交换将使用它来发布与用户尝试有关的事件原则上将发布正确和错误的事件因为对消费者的逻辑一无所知。另一方面Gamification微服务使用满足其要求的绑定键来声明队列路由键用作过滤器仅接收正确的尝试。如上图所示可能有多个Gamification微服务实例从同一队列中消费那么代理将平衡所有实例之间的负载。 假设另一种微服务也对ChallengeSolvedEvent感兴趣该微服务需要声明自己的队列来消费相同的消息。例如引入Reports微服务该微服务将创建reports队列并使用绑定键attempt.*或#来消费正确和错误的尝试。 可以结合发布/订阅和工作队列模式以便多个微服务可以处理相同的消息且同一微服务的多个实例可以共享负载。此外让发布者负责交换和订阅者负责队列可构建事件驱动的微服务架构通过引入消息代理实现了二者之间的松耦合。
计划
要实现事件驱动架构改变需要完成以下工作
添加新的依赖到pom.xml中以扩充Spring Boot应用程序支持。删除将挑战显式发送给Gamification和相应控制器的REST API客户端。将ChallengeSolvedDTO重命名为ChallengeSolvedEvent。作为发布者声明Multiplication微服务上的交换。修改Multiplication微服务的逻辑以发布事件而不是调用REST API。作为订阅者在Gamification微服务上声明队列。添加消费者逻辑以获取队列中的事件并将其连接到现有的服务层以处理正确尝试的分数和徽章。同时重构测试。
添加AMQP支持
在Spring Boot应用程序中添加AMQP和RabbitMQ支持需要添加相应的启动项在pom.xml中添加相应的依赖如下所示 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency该启动项包括了spring-rabbit和spring-amqp通过spring-boot-autoconfigureRabbitAutoConfiguration类提供连接RabbitMQ服务的默认配置方便使用。 RabbitAutoConfiguration类使用在RabbitProperties类中定义的一组属性可以在application.properties文件中覆盖这些属性。这里可以找到预定义的主机名localhost、端口5672、用户名guest和密码guest等。自动配置类为RabbitTemplate对象构建连接工厂和配置重新可用它们接收消息或发送消息到RabbitMQ。也可以使用抽象接口AmqpTemplate。自动配置还包括一些使用其他机制接收消息的默认配置如RabbitListener注解等。
在Multiplication微服务中发布事件
在微服务Multiplication在添加了消息代理支持就可以发布事件进行配置了。
交换的名称保存在配置中会很方便就不必在以后根据运行应用程序的环境对其进行修改也不必在应用程序之间共享。日志记录设置可以在应用程序与RabbitMQ进行交互时查看日志。可以将RabbitAdmin类的日志级别改为DEBUG该类与RatherMQ代理进行交互以声明交换、队列和绑定。
另外可删除指向Gamification服务的属性现在不再需要了对application.properties修改如下
# 配置交换
amqp.exchange.attemptsattempts.topic
# 配置日志记录显式交换、队列、绑定等声明信息
logging.level.org.springframework.amqp.rabbit.core.RabbitAdminDEBUG接着将Exchange声明添加到AMQP的独立配置文件中可使用Spring的交换生成器ExchangeBuilder在配置中添加代理中声明的主题类型的Bean配置使用JSON进行序列化的消息转换器。AMQPConfiguration类代码如下
package cn.zhangjuli.multiplication.configuration;import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置RabbitMQ通过AMQP抽象在应用程序中使用事件* author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/
Configuration
public class AMQPConfiguration {/*** 配置主题交换* param exchangeName 交换名称从配置中获取* return TopicExchange使用配置的交换名称构建主题交换*/Beanpublic TopicExchange challengesTopicExchange(Value(${amqp.exchange.attempts}) final String exchangeName) {return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();}/*** 配置消息转换器用JSON对象序列化程序覆盖默认的Java对象序列化程序以避免Java对象序列化的缺陷。* return 消息转换器*/Beanpublic Jackson2JsonMessageConverter producerJackson2MessageConverter() {return new Jackson2JsonMessageConverter();}
}用JSON对象序列化程序覆盖默认的Java对象序列化程序可以避免Java对象序列化的缺陷。
在不同编程语言之间使用可不好。如果要引入不是用Java编写的消费者则必须找一个特定的库来执行跨语言反序列化。在消息的标题中使用硬编码完全限定类型名称。反序列化程序希望Java Bean位于相同的程序包中并具有相同的名称和字段这根本不灵活因为可能只想反序列化某些属性并遵循良好的领域驱动设计实践来保留自己的事件数据版本。
发布者端Jackson2JsonMessageConverter 使用预先配置的ObjectMapper然后RabbitTemple实现将使用这个Bean该类将序列化对象并将其作为AMQP消息发送给代理。订阅者端可从JSON格式中受益从而可使用任何编程语言对内容进行反序列化还可以使用自己的对象表示形式忽略消费者端不需要的属性从而减少微服务之间的耦合。如果发布者在有效载荷中包括新字段订阅者也不必更改任何内容。 JSON不是Spring AMQP消息转换器支持的唯一格式还可以使用XML或Google的协议缓冲区即protobuf。在性能至关重要的实际系统中应考虑二进制格式如protobuf。
下面删除GamificationServiceClient类同时需要删除ChallengeServiceImpl类中的相关引用只有有限的几行。然后将ChallengeSolvedDTO重命名为ChallengeSolvedEvent不需要修改任何字段仅此而已。ChallengeSolvedEvent类如下
package cn.zhangjuli.multiplication.challenge;import lombok.Value;/*** author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/
Value
public class ChallengeSolvedEvent {long attemptId;boolean correct;int factorA;int factorB;long userId;String userAlias;
}注意使用显式的命名约定是一种良好的实践。 下面在服务层创建一个新组件以发布事件等效于已经删除的REST API客户端但它与消息代理进行通信。ChallengeEventPublisher 类的代码如下
package cn.zhangjuli.multiplication.challenge;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;/*** author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/
Service
public class ChallengeEventPublisher {private final AmqpTemplate amqpTemplate;private final String challengesTopicExchange;public ChallengeEventPublisher(AmqpTemplate amqpTemplate,Value(${amqp.exchange.attempts})String challengesTopicExchange) {this.amqpTemplate amqpTemplate;this.challengesTopicExchange challengesTopicExchange;}/*** 发布事件设置路由键然后发送到代理。* param challengeAttempt 消息源需要转换为发布的事件。*/public void challengesSolved(final ChallengeAttempt challengeAttempt) {ChallengeSolvedEvent event buildEvent(challengeAttempt);// 路由键是 attempt.correct 或 attempt.wrongString routingKey attempt. (event.isCorrect() ? correct : wrong);amqpTemplate.convertAndSend(challengesTopicExchange, routingKey, event);}/*** 将ChallengeAttempt转换为ChallengeSolvedEvent 对象* param challengeAttempt 消息源* return 事件要发布的数据。*/private ChallengeSolvedEvent buildEvent(final ChallengeAttempt challengeAttempt) {return new ChallengeSolvedEvent(challengeAttempt.getId(),challengeAttempt.isCorrect(), challengeAttempt.getFactorA(),challengeAttempt.getFactorB(), challengeAttempt.getUser().getId(),challengeAttempt.getUser().getAlias());}
}还记得在删除GamificationServiceClient类时要在ChallengeServiceImpl类中删除的引用吗这是关联点现在要使用ChallengeEventPublisher替换GamificationServiceClient功能即可将消息发布到RabbitMQ代理上这样感兴趣的任何组件就可以使用这些消息了。修改ChallengeServiceImpl类以发送新事件代码如下
package cn.zhangjuli.multiplication.challenge;import cn.zhangjuli.multiplication.user.User;
import cn.zhangjuli.multiplication.user.UserRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.util.List;/*** author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/
Service
RequiredArgsConstructor
Slf4j
public class ChallengeServiceImpl implements ChallengeService {private final UserRepository userRepository;private final ChallengeAttemptRepository attemptRepository;private final ChallengeEventPublisher challengeEventPublisher;Overridepublic ChallengeAttempt verifyAttempt(ChallengeAttemptDTO attemptDTO) {// Check if the attempt is correctboolean isCorrect attemptDTO.getGuess() attemptDTO.getFactorA() * attemptDTO.getFactorB();// 检查alias用户是否存在不存在就创建User user userRepository.findByAlias(attemptDTO.getUserAlias()).orElseGet(() - {log.info(Creating new user with alias {}, attemptDTO.getUserAlias());return userRepository.save(new User(attemptDTO.getUserAlias()));});// Builds the domain object. Null id for now.ChallengeAttempt checkedAttempt new ChallengeAttempt(null,user,attemptDTO.getFactorA(),attemptDTO.getFactorB(),attemptDTO.getGuess(),isCorrect);// Stores the attemptChallengeAttempt storedAttempt attemptRepository.save(checkedAttempt);log.info(attempt: {}, storedAttempt);// 发布事件来通知潜在的感兴趣的订阅者challengeEventPublisher.challengesSolved(storedAttempt);return storedAttempt;}Overridepublic ListChallengeAttempt getStatisticsForUser(final String userAlias) {return attemptRepository.findTop10ByUserAliasOrderByIdDesc(userAlias);}
}同样地修改ChallengeServiceTest 类以测试发送新事件代码如下
package cn.zhangjuli.multiplication.challenge;import cn.zhangjuli.multiplication.user.User;
import cn.zhangjuli.multiplication.user.UserRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;import java.util.List;
import java.util.Optional;import static org.assertj.core.api.BDDAssertions.then;
import static org.mockito.AdditionalAnswers.returnsFirstArg;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;/*** author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/ExtendWith(MockitoExtension.class)
public class ChallengeServiceTest {private ChallengeService challengeService;// 使用Mockito进行模拟Mockprivate UserRepository userRepository;Mockprivate ChallengeAttemptRepository attemptRepository;Mockprivate ChallengeEventPublisher challengeEventPublisher;BeforeEachpublic void setUp() {challengeService new ChallengeServiceImpl(userRepository,attemptRepository,challengeEventPublisher);}Testpublic void checkCorrectAttemptTest() {// given// 这里希望save方法什么都不做只返回第一个也是唯一一个传递的参数这样不必调用真实的存储库即可测试该层。given(attemptRepository.save(any())).will(returnsFirstArg());ChallengeAttemptDTO attemptDTO new ChallengeAttemptDTO(50, 60, noise, 3000);// whenChallengeAttempt resultAttempt challengeService.verifyAttempt(attemptDTO);// thenthen(resultAttempt.isCorrect()).isTrue();verify(attemptRepository).save(resultAttempt);verify(challengeEventPublisher).challengesSolved(resultAttempt);}Testpublic void checkWrongAttemptTest() {// givengiven(attemptRepository.save(any())).will(returnsFirstArg());ChallengeAttemptDTO attemptDTO new ChallengeAttemptDTO(50, 60, noise, 5000);// whenChallengeAttempt resultAttempt challengeService.verifyAttempt(attemptDTO);// thenthen(resultAttempt.isCorrect()).isFalse();verify(userRepository).save(new User(noise));verify(attemptRepository).save(resultAttempt);verify(challengeEventPublisher).challengesSolved(resultAttempt);}Testpublic void checkExistingUserTest() {// givengiven(attemptRepository.save(any())).will(returnsFirstArg());User existingUser new User(1L, john_doe);given(userRepository.findByAlias(john_doe)).willReturn(Optional.of(existingUser));ChallengeAttemptDTO attemptDTO new ChallengeAttemptDTO(50, 60, john_doe, 5000);// whenChallengeAttempt resultAttempt challengeService.verifyAttempt(attemptDTO);// thenthen(resultAttempt.isCorrect()).isFalse();then(resultAttempt.getUser()).isEqualTo(existingUser);verify(userRepository, never()).save(any());verify(attemptRepository).save(resultAttempt);verify(challengeEventPublisher).challengesSolved(resultAttempt);}Testpublic void retrieveStatisticsTest() {// givenUser user new User(john_doe);ChallengeAttempt attempt1 new ChallengeAttempt(1L, user, 50, 60, 3010, false);ChallengeAttempt attempt2 new ChallengeAttempt(2L, user, 50, 60, 3051, false);ListChallengeAttempt lastAttempts List.of(attempt1, attempt2);given(attemptRepository.findTop10ByUserAliasOrderByIdDesc(john_doe)).willReturn(lastAttempts);// whenListChallengeAttempt latestAttemptsResult challengeService.getStatisticsForUser(john_doe);// thenthen(latestAttemptsResult).isEqualTo(lastAttempts);}
}在ChallengeServiceTest 类中并没有模拟AmqpTemplate的行为。需要一个新的ChallengeEventPublisherTest类使用Mockit的ArgumentCaptor类捕获传递给Mock的参数来模拟AmqpTemplate的路由键和事件对象调用。代码如下
package cn.zhangjuli.multiplication.challenge;import cn.zhangjuli.multiplication.user.User;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.amqp.core.AmqpTemplate;import static org.assertj.core.api.BDDAssertions.then;
import static org.mockito.Mockito.verify;/*** author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/
ExtendWith(MockitoExtension.class)
public class ChallengeEventPublisherTest {private ChallengeEventPublisher challengeEventPublisher;Mockprivate AmqpTemplate amqpTemplate;BeforeEachpublic void setUp() {challengeEventPublisher new ChallengeEventPublisher(amqpTemplate, test.topic);}ParameterizedTestValueSource(booleans {true, false})public void sendsAttempt(boolean correct) {// givenChallengeAttempt challengeAttempt createTestChallengeAttempt(correct);// whenchallengeEventPublisher.challengesSolved(challengeAttempt);// thenvar exchangeCaptor ArgumentCaptor.forClass(String.class);var routingKeyCaptor ArgumentCaptor.forClass(String.class);var eventCaptor ArgumentCaptor.forClass(ChallengeSolvedEvent.class);verify(amqpTemplate).convertAndSend(exchangeCaptor.capture(),routingKeyCaptor.capture(), eventCaptor.capture());then(exchangeCaptor.getValue()).isEqualTo(test.topic);then(routingKeyCaptor.getValue()).isEqualTo(attempt. (correct ? correct : wrong));then(eventCaptor.getValue()).isEqualTo(solvedEvent(correct));}private ChallengeSolvedEvent solvedEvent(boolean correct) {return new ChallengeSolvedEvent(1L, correct, 30, 40, 10L, john_doe);}private ChallengeAttempt createTestChallengeAttempt(boolean correct) {return new ChallengeAttempt(1L,new User(10L, john_doe),30,40,correct ? 1200 : 1300,correct);}
}在Gamification微服务中订阅事件
已经在Multiplication微服务中发布了事件那么Gamification微服务中怎么订阅事件呢和Multiplication类似需要改变事件的使用方式要替换现有的接受事件订阅者的控制器。 首先在微服务Gamification的pom.xml中添加消息代理支持就可以订阅事件进行配置了。
交换的名称发布者将事件发送到交换。队列的名称交换将消息路由到一个或者多个队列中订阅者再去消费消息。日志记录设置可以在应用程序与RabbitMQ进行交互时查看日志。可以将RabbitAdmin类的日志级别改为DEBUG该类与RatherMQ代理进行交互以声明交换、队列和绑定。
在application.properties配置这些信息修改如下
# 交换名称
amqp.exchange.attemptsattempts.topic
# 队列名称
amqp.queue.gamificationgamification.queue
# 配置日志记录显式交换、队列、绑定等声明信息
logging.level.org.springframework.amqp.rabbit.core.RabbitAdminDEBUG实际上生产者将消息发送到 Exchange (交换器“X”)由交换器将消息路由到一个或者多个队列中消费者再去消费消息。如果路由不到或许会返回给生产者或许直接丢弃。要明白两个概念
BindingKey绑定键交换器都是需要与队列进行绑定这里交换键可以简单的理解为交换器与队列之间的路径的名称可以重复即可以把多条队列以同一绑定键与路由器绑定。RoutingKey路由键生产者发送消息的时候可以带上路由键发送给交换器交换器就会根据路由键去匹配队列路由键与交换器的匹配。 注意消费端也应该声明交换当声明队列时交换必须存在。交换使用持久化这仅在第一次声明时适用。但在微服务中声明所需的交换和队列是一种良好的编程习惯。另外RabbitMQ实体的声明是幂等操作如果实体存在则该操作无效。 同样需要在AMQPConfiguration类中进行配置以使用JSON进行序列化的消息转换器而不是默认消息转换器提供的格式。AMQPConfiguration类代码如下
package cn.zhangjuli.gamification.configuration;import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;/*** author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/
Configuration
public class AMQPConfiguration {/*** 声明交换* param exchangeName 交换名* return 主题交换*/Beanpublic TopicExchange challengesTopicExchange(Value(${amqp.exchange.attempts}) final String exchangeName) {return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();}/*** 声明队列* param queueName 队列名* return 队列*/Beanpublic Queue gamificationQueue(Value(${amqp.queue.gamification}) final String queueName) {return QueueBuilder.durable(queueName).build();}/*** 声明绑定绑定队列到交换器上* param gamificationQueue 队列* param attemptsExchange 交换器* return 绑定*/Beanpublic Binding correctAttemptsBuilding(final Queue gamificationQueue,final TopicExchange attemptsExchange) {return BindingBuilder.bind(gamificationQueue).to(attemptsExchange).with(attempt.correct);}/*** 设置MessageHandlerMethodFactory替换默认Bean使用默认工厂为基准将其消息转换器替换为* MappingJackson2MessageConverter实例处理从JSON到Java类的消息反序列化。并对其包含的* ObjectMapper进行微调添加ParameterNamesModule以避免必须为事件类使用空的构造方法。* return 设置后的MessageHandlerMethodFactory工厂*/Beanpublic MessageHandlerMethodFactory messageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory factory new DefaultMessageHandlerMethodFactory();final MappingJackson2MessageConverter jsonConverter new MappingJackson2MessageConverter();jsonConverter.getObjectMapper().registerModule(new ParameterNamesModule(JsonCreator.Mode.PROPERTIES));factory.setMessageConverter(jsonConverter);return factory;}/*** 设置RabbitListenerConfigurer以使监听器使用JSON反序列化使用MessageHandlerMethodFactory的* 实现来覆盖RabbitListenerConfigurer。* param messageHandlerMethodFactory 工厂* return RabbitListenerConfigurer*/Beanpublic RabbitListenerConfigurer rabbitListenerConfigurer(final MessageHandlerMethodFactory messageHandlerMethodFactory) {return (c) - c.setMessageHandlerMethodFactory(messageHandlerMethodFactory);}
}这里不使用AmqpTemplate来接收消息因为这是基于轮询的会消耗网络资源而想使用代理在有消息时通知订阅者会使用异步通信AMQP抽象不支持这些功能spring-rabbit则提供了异步消费消息的机制。 下面将ChallengeSolvedDTO重构为ChallengeSolvedEvent从技术上讲不需要改名因为JSON格式仅指定字段名和值但应养成良好的编程习惯使用合适的名称来命名更容易找到相关的类。ChallengeSolvedEvent代码如下
package cn.zhangjuli.gamification.challenge;import lombok.Value;/*** 定义了微服务Multiplication和微服务Gamification之间的契约为保持独立性在两个项目中都需要创建。* author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/
// 表明该类是不可变类
Value
public class ChallengeSolvedEvent {long attemptId;boolean correct;int factorA;int factorB;long userId;String userAlias;
}遵循领域驱动的设计实践可以调整此事件的反序列化字段。例如不需要userAlias作为Gamification的业务逻辑可以将其从消费事件中删除。因为Spring Boot默认情况下将ObjectMapper配置为忽略未知属性所以无需其他配置即可工作。 最好不要在微服务之间共享代码因为要支持松耦合、向后兼容和独立部署。想象一下微服务Multiplication将进一步发展并存储额外的数据假设这是更艰巨挑战的额外因素。然后这些额外因素将被添加到已发布事件的代码中好的一面是通过在每个领域中使用事件的不同表示形式并将映射器配置为忽略未知属性Gamification微服务仍然有效而不必更新其事件表示形式。 现在就来消费事件使用RabbitListener注解使其在消息到达时充当消息的处理逻辑。这里只需要指定要订阅的队列名称因为已经在单独的配置类中声明了RabbitMQ实体。GameEventHandler 类代码如下
package cn.zhangjuli.gamification.game;import cn.zhangjuli.gamification.challenge.ChallengeSolvedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/
Service
RequiredArgsConstructor
Slf4j
public class GameEventHandler {private final GameService gameService;/*** 实现RabbitMQ订阅者代码很少可将队列传递给RabbitListener注解。* 以ChallengeSolvedEvent对象为期望的输入Spring自动配置一个反序列化器将消息从代理转换为该对象类型。* 因为AMQPConfiguration中的配置将使用JSON。* 默认情况下当方法最终正常完成时基于RabbitListener注解构建的逻辑会将确认发送给代理* 在Spring Rabbit中称为AUTO确认模式。* 如果想在处理之前就发送ACK信号可将其更改为NONE如果想完全控制则可将其修改为MANUAL。* 可在工厂级别全局配置或监听器级别通过将额外的参数传递给RabbitListener注解设置此参数和其他配置值。* 这里使用默认的错误策略AUTO捕获任何可能的异常记录错误然后重新抛出AmqpRejectAndDontRequeueException。* 这是Spring AMQP的快捷方式用于拒绝该消息并告诉代理不要重新排队* 这意味着如果消费者逻辑中出现意外错误将丢失消息。这里是可以接受的。* 如果要避免这种情况还可以设置代码以多次重试方法是重新抛出InstantRequeueAmqpException* 或使用Spring AMQP中提供的一些工具如错误处理程序或邮件恢复程序来处理这些无效的消息。* param event 期望的输入事件*/RabbitListener(queues ${amqp.queue.gamification})void handleMultiplicationSolved(final ChallengeSolvedEvent event) {log.info(已接收到成功挑战事件{}, event.getAttemptId());try {gameService.newAttemptForUser(event);} catch (final Exception e) {log.error(在处理ChallengeSolvedEvent时发生错误, e);// 避免事件重新排队和处理throw new AmqpRejectAndDontRequeueException(e);}}
}可使用RabbitListener注解做很多事
声明交换、队列和绑定。使用相同的方式从多个队列接收消息。通过使用Header对单个值或Header对映射注解额外的参数来处理消息头。注入Channel参数例如确认控制。通过从监听器返回值来实现请求/响应模式。将注解移到类级别并使用RabbitHandler注解方法可以配置多种方法来处理同一队列中出现的不同消息类型。
启动应用程序在RabbitMQ中发布消息就可以在控制台接收到消息如图所示 修改了订阅者的逻辑就可以删除GameController类了。需要重构GameService的接口及其实现GameServiceImpl来处理ChallengeSolvedEvent基本逻辑不变。代码如下
package cn.zhangjuli.gamification.game;import cn.zhangjuli.gamification.challenge.ChallengeSolvedEvent;
import cn.zhangjuli.gamification.game.badgeprocessors.BadgeProcessor;
import cn.zhangjuli.gamification.game.domain.BadgeCard;
import cn.zhangjuli.gamification.game.domain.BadgeType;
import cn.zhangjuli.gamification.game.domain.ScoreCard;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;/*** author Juli Zhang, a hrefmailto:zhjllut.edu.cnContact me/a br*/
Service
Slf4j
RequiredArgsConstructor
public class GameServiceImpl implements GameService {private final ScoreRepository scoreRepository;private final BadgeRepository badgeRepository;private final ListBadgeProcessor badgeProcessorList;Overridepublic GameResult newAttemptForUser(ChallengeSolvedEvent challenge) {if (challenge.isCorrect()) {ScoreCard scoreCard new ScoreCard(challenge.getUserId(), challenge.getAttemptId());scoreRepository.save(scoreCard);log.info(用户 {} 的尝试 {} 得分 {},challenge.getUserAlias(), challenge.getAttemptId(), scoreCard.getScore());ListBadgeCard badgeCards processForBadges(challenge);return new GameResult(scoreCard.getScore(),badgeCards.stream().map(BadgeCard::getBadgeType).collect(Collectors.toList()));} else {log.info(尝试 {} 不正确。用户 {} 没有得分。,challenge.getAttemptId(), challenge.getUserAlias());return new GameResult(0, List.of());}}/*** 检查总分和不同的得分来得到徽章* param challengeSolved 新的尝试* return 徽章的列表*/private ListBadgeCard processForBadges(final ChallengeSolvedEvent challengeSolved) {OptionalInteger optionalTotalScore scoreRepository.getTotalScoreForUser(challengeSolved.getUserId());if (optionalTotalScore.isEmpty()) {return Collections.emptyList();}int totalScore optionalTotalScore.get();// 得到用户的总分和徽章ListScoreCard scoreCardList scoreRepository.findByUserIdOrderByScoreTimestampDesc(challengeSolved.getUserId());SetBadgeType alreadyGetBadges badgeRepository.findByUserIdOrderByBadgeTimestampDesc(challengeSolved.getUserId()).stream().map(BadgeCard::getBadgeType).collect(Collectors.toSet());// 调用徽章处理器来处理还没有得到的徽章ListBadgeCard newBadgeCards badgeProcessorList.stream().filter(badgeProcessor - !alreadyGetBadges.contains(badgeProcessor.badgeType())).map(badgeProcessor - badgeProcessor.processForOptionalBadge(totalScore,scoreCardList, challengeSolved)).flatMap(Optional::stream).map(badgeType - new BadgeCard(challengeSolved.getUserId(), badgeType)).collect(Collectors.toList());badgeRepository.saveAll(newBadgeCards);return newBadgeCards;}
}注意将ChallengeSolvedDTO修改为ChallengeSolvedEvent会影响多个类使用IDEA环境将自动进行重构如果不能需要人工干预。 至此完成了如下工作
在Spring Boot应用程序中添加AMQP依赖启动项以使用AMQP和RabbitMQ。删除REST API客户端Multiplication和控制器Gamification将切换到事件驱动的架构。将ChallengeSolvedDTO重命名为ChallengeSolvedEvent并修改相关的类。在微服务中声明主题交换。更改Multiplication微服务的逻辑发布事件而不是调用REST API。在Gamification微服务定义队列。在Gamification微服务中实现RabbitMQ消费者逻辑。重构测试使其适应新界面。
因为没有改变开放的前端交互的API所以前端界面不需要改变。
场景分析
现在系统已经实现了事件驱动的架构就来看看使用消息代理带来的优势。系统基本逻辑如图所示。 #mermaid-svg-uQNAMGpHDCqdbwfk {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-uQNAMGpHDCqdbwfk .error-icon{fill:#552222;}#mermaid-svg-uQNAMGpHDCqdbwfk .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-uQNAMGpHDCqdbwfk .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-uQNAMGpHDCqdbwfk .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-uQNAMGpHDCqdbwfk .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-uQNAMGpHDCqdbwfk .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-uQNAMGpHDCqdbwfk .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-uQNAMGpHDCqdbwfk .marker{fill:#333333;stroke:#333333;}#mermaid-svg-uQNAMGpHDCqdbwfk .marker.cross{stroke:#333333;}#mermaid-svg-uQNAMGpHDCqdbwfk svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-uQNAMGpHDCqdbwfk .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-uQNAMGpHDCqdbwfk .cluster-label text{fill:#333;}#mermaid-svg-uQNAMGpHDCqdbwfk .cluster-label span{color:#333;}#mermaid-svg-uQNAMGpHDCqdbwfk .label text,#mermaid-svg-uQNAMGpHDCqdbwfk span{fill:#333;color:#333;}#mermaid-svg-uQNAMGpHDCqdbwfk .node rect,#mermaid-svg-uQNAMGpHDCqdbwfk .node circle,#mermaid-svg-uQNAMGpHDCqdbwfk .node ellipse,#mermaid-svg-uQNAMGpHDCqdbwfk .node polygon,#mermaid-svg-uQNAMGpHDCqdbwfk .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-uQNAMGpHDCqdbwfk .node .label{text-align:center;}#mermaid-svg-uQNAMGpHDCqdbwfk .node.clickable{cursor:pointer;}#mermaid-svg-uQNAMGpHDCqdbwfk .arrowheadPath{fill:#333333;}#mermaid-svg-uQNAMGpHDCqdbwfk .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-uQNAMGpHDCqdbwfk .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-uQNAMGpHDCqdbwfk .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-uQNAMGpHDCqdbwfk .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-uQNAMGpHDCqdbwfk .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-uQNAMGpHDCqdbwfk .cluster text{fill:#333;}#mermaid-svg-uQNAMGpHDCqdbwfk .cluster span{color:#333;}#mermaid-svg-uQNAMGpHDCqdbwfk div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-uQNAMGpHDCqdbwfk :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 2ChallengeSolvedEvent给任意感兴趣的订阅者 绑定attempt.correct 1、发送尝试 Attempts(主题交换 4更新分数 3、响应 Gamification队列 尝试 浏览器 Multiplication微服务 Gamification微服务 下面就启动系统来验证整个系统逻辑
确保RabbitMQ服务正常。运行两个微服务Multiplication和Gamification。运行React界面。在浏览器中进入http://localhost:15672/的RabbitMQ管理界面使用相应的密码登录Spring Boot默认guest/guest如果不存在就使用管理员创建之。
事件流
先来看看Gamification微服务的启动日志会看到下面一些与RabbitMQ有关的信息如下所示
2023-12-10T09:10:56.98008:00 INFO 32272 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-12-10T09:10:57.01008:00 INFO 32272 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#4603845b:0/SimpleConnection383c94ed [delegateamqp://admin127.0.0.1:5672/, localPort53842]
2023-12-10T09:10:57.01308:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
2023-12-10T09:10:57.02308:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange attempts.topic
2023-12-10T09:10:57.02508:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue gamification.queue
2023-12-10T09:10:57.02708:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [gamification.queue (QUEUE)] to exchange [attempts.topic] with routing key [attempt.correct]
2023-12-10T09:10:57.02908:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Declarations finished使用Spring AMQP时会记录前面两行表明已成功连接到代理。另外的日志信息是因为开启了RabbitAdmin类的日志记录级别为DEBUG。 Multiplication微服务端还没有RabbitMQ日志原因是连接和交换的声明仅在发布第一条消息时发生。当前端界面有重试到来时就启动连接控制台日志就会显示相关日志。如下所示
2023-12-09T15:38:48.05008:00 INFO 59044 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-12-09T15:38:48.08108:00 INFO 59044 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#15f229e8:0/SimpleConnection3da194a4 [delegateamqp://guest127.0.0.1:5672/, localPort65533]
2023-12-09T15:38:48.08308:00 DEBUG 59044 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
2023-12-09T15:38:48.09508:00 DEBUG 59044 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange attempts.topic
2023-12-09T15:38:48.09908:00 DEBUG 59044 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Declarations finished可以通过RabbitMQ管理界面了解当前状态。在Connections选项卡中可看到微服务创建的连接如图所示 切换到Exchanges选项卡会看到topic类型的attempts.topic交换声明为durableD如图所示 单击交换名称进入详细信息页面可以看到相关信息可以绑定队列和设置路由键还可以发布消息如图所示 Queues选项卡显示了队列该队列也配置为durableD。如图所示 Channels选项卡显示了通道信息如图所示 了解了基本情况后就来看看发送重试信息后的响应使用HTTPie产生正确的尝试挑战如下所示 http POST :8080/attempts factorA50 factorB60 userAliasnoise guess3000在Multiplication日志中可以看到如何连接代理并声明交换的第一次可见如前面的日志所示这里已经存在就无效了可以在控制台看到发布消息的输出如下所示
2023-12-10T09:49:49.27008:00 INFO 59044 --- [nio-8080-exec-5] c.z.m.challenge.ChallengeServiceImpl : attempt: ChallengeAttempt(id1756, userUser(id1, aliasnoise), factorA50, factorB60, resultAttempt3000, correcttrue)Gamification微服务将反映事件的消费情况并更新分数日志如下
2023-12-10T09:49:49.30408:00 INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件1756
2023-12-10T09:49:49.35108:00 INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise 的尝试 1756 得分 10
2023-12-10T10:02:25.74008:00 INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件1757
2023-12-10T10:02:25.74108:00 INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise 的尝试 1757 得分 10多发布几次消息可以从RabbitMQ管理界面中从Queues选项卡单击想要查看的队列名称即可看到队列的整体情况如图所示 当然也可以使用前端界面来进行尝试如图所示
Gamification微服务不可用
前面的微服务实现具有弹性即使Gamification微服务不可用也不会失效。但是会错过这段时间发送的成功尝试现在引入了消息代理之后呢 停止Gamification微服务使用HTTPie发送尝试看看如何 http POST :8080/attempts factorA50 factorB60 userAliasnoise1 guess3000RabbitMQ的“队列详细信息”页面将显示排队消息如图所示 这些消息仍然存在但没有消费者消费。检查Multiplication微服务日志也没有错误。它将消息发布到代理并向API客户端返回OK响应实现了松耦合Multiplication不需要知道消费者是否可用。整个过程是异步的基于事件驱动的。 再次回到Gamification微服务启动后将在日志中看到它会从代理接收所有事件消息。然后会触发其业务逻辑更新分数。现在就没有丢失任何数据。如下所示
2023-12-10T11:00:59.80108:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件1781
2023-12-10T11:00:59.86108:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1781 得分 10
2023-12-10T11:00:59.89608:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件1782
2023-12-10T11:00:59.89808:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1782 得分 10
2023-12-10T11:00:59.90508:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件1783
2023-12-10T11:00:59.90908:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1783 得分 10
2023-12-10T11:00:59.91608:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件1784
2023-12-10T11:00:59.91808:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1784 得分 10
2023-12-10T11:00:59.92408:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件1785
2023-12-10T11:00:59.92608:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1785 得分 10
2023-12-10T11:00:59.93108:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件1786
2023-12-10T11:00:59.93108:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1786 得分 10
2023-12-10T11:00:59.93808:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件1787
2023-12-10T11:00:59.93908:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1787 得分 10还可用用更新的分数来验证排行榜。系统不仅有弹性还可用在故障后恢复。RabbitMQ界面的队列详细信息也显示了排队消息的计数为0因为已经消费完了。 想象一下如果RabbitMQ可以在丢弃消息之前配置将消息保留在队列中的时间生存时间TTL还可以配置队列的最大长度就可以根据用户需要进行调整了。 例如可以将前面Queue配置进行设置来提供相关的配置可以配置队列使其具有6小时TTL和最大长度为25000条消息代码如下 Beanpublic Queue gamificationQueue(Value(${amqp.queue.gamification}) final String queueName) {return QueueBuilder.durable(queueName).ttl((int) Duration.ofHours(6).toMillis()).maxLength(25000L).build();}消息代理不可用
在队列有待传递的消息时关闭代理呢要测试这种情况按照如下步骤操作
停止Gamification服务。使用一个用户别名发送一些正确的测试并在RabbitMQ管理界面验证队列是否保存了这些消息。停止RabbitMQ代理。再次发送正确的测试。启动Gamification微服务。大约10秒后再次启动RabbitMQ代理。
和前面一样手动测试后代理关闭前发送的尝试被RabbitMQ放入队列但没有得到处理。停止RabbitMQ后再次执行测试将获得HTTP错误响应日志如下
2023-12-10T15:07:21.14808:00 INFO 59044 --- [nio-8080-exec-6] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-12-10T15:07:21.15308:00 ERROR 59044 --- [nio-8080-exec-6] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information] with root causejava.net.ConnectException: Connection refused: no further information
//...其余内容省略可添加一个try/catch子句来抑制错误更好的方法是实现自定义HTTP错误处理程序以返回特定的错误响应如503服务不可达指示与代理断开时系统无法运行可有多种选择。 再次启动Gamification微服务后仍然可以运行但会不断重试连接日志如下
2023-12-10T15:14:05.52808:00 WARN 21044 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information
2023-12-10T15:14:05.52808:00 INFO 21044 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer52903e9b: tags[[]], channelnull, acknowledgeModeAUTO local queue size0
2023-12-10T15:14:05.52908:00 INFO 21044 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-12-10T15:14:05.53008:00 ERROR 21044 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
2023-12-10T15:14:05.53008:00 INFO 21044 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]而且新的尝试请求到来时Multiplication微服务也会执行相同的操作。再次启动RabbitMQ代理时两个微服务恢复连接。当Gamification重新连接到RabbitMQ后就会收到排队的事件进行后续的处理。这是因为声明了持久化的交换和队列而且Spring实现在发布消息时使用了持久化传递模式。如果使用RabbitTemplate而不是AmqpTemplate来发布消息可以自己设置消息的属性将消息传递模式修改为非持久化示例如下
MessageProperties properties MessagePropertiesBuilder.newInstance().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
rabbitTemplate.getMessageConverter().toMessage(challengeAttempt, properties);
rabbitTemplate.convertAndSend(challengesTopicExchange, routingKey, event);这个例子告诉大家应该了解所使用的工具的配置项。以持久性方式发送所有消息带来了不错的优势但会带来额外的性能开销。如果配置正确分布的RabbitMQ集群宕机的可能性会很小也可能愿意接受潜在的消息丢失以提高性能这取决于需求。
事务性
前面的测试暴露了一个意外情况代理关闭后发送尝试将收到服务器错误错误代码为500给客户端造成了一种印象未正常处理该消息实际上已经处理了一部分了。 停止代理运行Multiplication再次发送尝试将得到错误响应例如 http POST :8080/attempts factorA50 factorB60 userAliasnoise3 guess3000
HTTP/1.1 500
Connection: close
Content-Type: application/json
Date: Sun, 10 Dec 2023 07:44:59 GMT
Transfer-Encoding: chunked
Vary: Origin, Access-Control-Request-Method, Access-Control-Request-Headers{error: Internal Server Error,path: /attempts,status: 500,timestamp: 2023-12-10T07:44:59.98000:00
}但是查看数据库进入Multiplication数据库的H2控制台查看ChallengeAttempt数据表可以看到最后添加了1行尝试这意味着即使收到了错误响应该信息也已经存储了。如果代码在尝试将消息发送给代理之前持久化对象这就无法避免。 可将服务方法verifyAttempt中包含的整个逻辑视为一个事务可回滚数据库事务不执行。即使在调用存储库中的save方法之后仍然出现错误仍然可以回滚。使用Spring框架来实现该操作很容易只需要将Transactional注解添加到方法上即可代码如下 TransactionalOverridepublic ChallengeAttempt verifyAttempt(ChallengeAttemptDTO attemptDTO) {// ...}如果在这段代码中存在异常则事务将回滚。如果要求服务中的所有方法都具有事务性可以在类级别添加该注解。 导致 Transactional 失效的常见场景有以下 5 个
非 public 修饰的方法timeout 超时时间设置过小代码中使用 try/catch 处理异常调用类内部的 Transactional 方法数据库不支持事务。
修改后重试相同的步骤重启Multiplication微服务关闭代理发送尝试查看数据库存储发现没有存储。由于抛出异常Spring回滚数据库操作就像未执行过。 Spring还支持发布者和订阅者双方的RabbitMQ事务在Transactional注解的方法范围内使用AmqpTemplate或RabbitTemplate实现发送消息当在通道RabbitMQ中启动事务性时即使在发送这些消息的方法调用之后发生异常这些消息也不会到达代理。在消费端也可以使用事务拒绝已处理的消息。这种情况下需要设置队列以重新排列被拒绝的消息这是默认情况。 类似情况下可简化事务处理策略将其限制在数据库中
发布时如果只有一个代理操作则可在流程结束时发布消息在发送消息之前或期间发生的任何错误都将导致数据库操作回滚。在订阅者端如果有异常默认情况下将拒绝该消息。如果是想要的消息则可以对其重新排列。然后还可以在方法中使用Transactional注解如果发生故障数据库将回滚。 微服务内的本地事务性对保持数据一致性并避免部分流程完成至关重要当业务逻辑涉及多个步骤并可能与数据库或消息代理之类的外部组件交互时应该考虑到业务逻辑可能出错。 扩展微服务
微服务架构的优势之一是可以独立扩展系统的各个部分在消息代理中引入事件驱动方法的好处可以透明地添加更多的发布者和订阅者目前为止一直在运行微服务的单个实例不能确定架构还支持为每个微服务添加更多的副本。 不能使用多个微服务实例的第一个原因是数据库。当水平扩展微服务时所有副本都应共享数据层并将数据存储在一个公共位置而不是每个实例都隔离。微服务必须是无状态的原因是不同的请求或消息可能最终出现在不同的微服务实例中。例如不应该假定同一Multiplication实例将处理来自同一用户发送的两次尝试因此不能在尝试中保持任何内存状态如图所示 #mermaid-svg-4GZHTD2dglaMy0Kl {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-4GZHTD2dglaMy0Kl .error-icon{fill:#552222;}#mermaid-svg-4GZHTD2dglaMy0Kl .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-4GZHTD2dglaMy0Kl .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-4GZHTD2dglaMy0Kl .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-4GZHTD2dglaMy0Kl .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-4GZHTD2dglaMy0Kl .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-4GZHTD2dglaMy0Kl .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-4GZHTD2dglaMy0Kl .marker{fill:#333333;stroke:#333333;}#mermaid-svg-4GZHTD2dglaMy0Kl .marker.cross{stroke:#333333;}#mermaid-svg-4GZHTD2dglaMy0Kl svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-4GZHTD2dglaMy0Kl .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-4GZHTD2dglaMy0Kl .cluster-label text{fill:#333;}#mermaid-svg-4GZHTD2dglaMy0Kl .cluster-label span{color:#333;}#mermaid-svg-4GZHTD2dglaMy0Kl .label text,#mermaid-svg-4GZHTD2dglaMy0Kl span{fill:#333;color:#333;}#mermaid-svg-4GZHTD2dglaMy0Kl .node rect,#mermaid-svg-4GZHTD2dglaMy0Kl .node circle,#mermaid-svg-4GZHTD2dglaMy0Kl .node ellipse,#mermaid-svg-4GZHTD2dglaMy0Kl .node polygon,#mermaid-svg-4GZHTD2dglaMy0Kl .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-4GZHTD2dglaMy0Kl .node .label{text-align:center;}#mermaid-svg-4GZHTD2dglaMy0Kl .node.clickable{cursor:pointer;}#mermaid-svg-4GZHTD2dglaMy0Kl .arrowheadPath{fill:#333333;}#mermaid-svg-4GZHTD2dglaMy0Kl .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-4GZHTD2dglaMy0Kl .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-4GZHTD2dglaMy0Kl .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-4GZHTD2dglaMy0Kl .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-4GZHTD2dglaMy0Kl .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-4GZHTD2dglaMy0Kl .cluster text{fill:#333;}#mermaid-svg-4GZHTD2dglaMy0Kl .cluster span{color:#333;}#mermaid-svg-4GZHTD2dglaMy0Kl div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-4GZHTD2dglaMy0Kl :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 8080 9080 发送尝试/获取用户别名 Multiplication实例1 Multiplication实例2 Multiplication 好的一面是微服务是无状态的可以独立处理每个请求或消息结果最终存储在数据库中。但有一个技术问题如果在端口9080上启动第二个实例将无法启动因为它试图创建新的数据库实例。 下面就来看看这个问题先正常运行应该8080端口的Multiplication实例。然后启动第二个实例覆盖server.port参数为9080以避免端口冲突。可执行如下命令
./mvnw spring-boot:run -Dspring-boot.run.arguments--server.port9080当启动第二个实例时日志会提示错误
org.h2.jdbc.JdbcSQLNonTransientConnectionException: Database may be already in use: ~/multiplication.mv.db. Possible solutions: close all other connection(s); use the server mode [90020-214]发生错误的原因是H2数据库引擎该引擎默认情况下被设计成嵌入式进程而不是服务器。需要改变的是使用H2数据库服务器的模式以便可以使用多个实例进行连接在application.properties中添加配置如下
spring.datasource.urljdbc:h2:file:~/multiplication1;DB_CLOSE_ON_EXITFALSE;AUTO_SERVERtrue;注意AUTO_SAVEtrue可能不起作用可以换成MySQL来试试。 第二个挑战是负载均衡。如果启动多个实例如何从用户界面连接到它们下面看看消息代理是如何实现RabbitMQ消息订阅者之间的负载均衡的。如图所示 #mermaid-svg-5KdIr9ZPnhgO4OWK {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-5KdIr9ZPnhgO4OWK .error-icon{fill:#552222;}#mermaid-svg-5KdIr9ZPnhgO4OWK .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-5KdIr9ZPnhgO4OWK .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-5KdIr9ZPnhgO4OWK .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-5KdIr9ZPnhgO4OWK .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-5KdIr9ZPnhgO4OWK .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-5KdIr9ZPnhgO4OWK .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-5KdIr9ZPnhgO4OWK .marker{fill:#333333;stroke:#333333;}#mermaid-svg-5KdIr9ZPnhgO4OWK .marker.cross{stroke:#333333;}#mermaid-svg-5KdIr9ZPnhgO4OWK svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-5KdIr9ZPnhgO4OWK .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-5KdIr9ZPnhgO4OWK .cluster-label text{fill:#333;}#mermaid-svg-5KdIr9ZPnhgO4OWK .cluster-label span{color:#333;}#mermaid-svg-5KdIr9ZPnhgO4OWK .label text,#mermaid-svg-5KdIr9ZPnhgO4OWK span{fill:#333;color:#333;}#mermaid-svg-5KdIr9ZPnhgO4OWK .node rect,#mermaid-svg-5KdIr9ZPnhgO4OWK .node circle,#mermaid-svg-5KdIr9ZPnhgO4OWK .node ellipse,#mermaid-svg-5KdIr9ZPnhgO4OWK .node polygon,#mermaid-svg-5KdIr9ZPnhgO4OWK .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-5KdIr9ZPnhgO4OWK .node .label{text-align:center;}#mermaid-svg-5KdIr9ZPnhgO4OWK .node.clickable{cursor:pointer;}#mermaid-svg-5KdIr9ZPnhgO4OWK .arrowheadPath{fill:#333333;}#mermaid-svg-5KdIr9ZPnhgO4OWK .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-5KdIr9ZPnhgO4OWK .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-5KdIr9ZPnhgO4OWK .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-5KdIr9ZPnhgO4OWK .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-5KdIr9ZPnhgO4OWK .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-5KdIr9ZPnhgO4OWK .cluster text{fill:#333;}#mermaid-svg-5KdIr9ZPnhgO4OWK .cluster span{color:#333;}#mermaid-svg-5KdIr9ZPnhgO4OWK div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-5KdIr9ZPnhgO4OWK :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 2 负载均衡 1 负载均衡 发送尝试/获取用户别名8080 发送尝试/获取用户别名9080 获取排行榜8081 获取排行榜9081 3 3 4 4 Gamification实例1 Gamification实例2 Multiplication实例1 Multiplication实例2 浏览器 Multiplication Gamification 尝试主题交换 Gamification队列 RabbitMQ支持多种来源的消息发布这意味着可以有多个Multiplication微服务副本将事件发布到同一主题交换这是透明的这些实例打开不同的连接声明交换仅在第一次创建然后发布数据而不必知道还有其他发布者。在订阅者端由多个消费者共享当启动多个Gamification微服务实例时所有实例都声明相同的队列和绑定代理足够智能可在它们之间进行负载均衡。这就在消息级别解决了负载均衡的问题。 启动每个微服务的实例、前端和RabbitMQ服务。在两个单独的终端运行另外的两个微服务实例让Multiplication和Gamification都有两个副本注意端口不同 ./mvnw spring-boot:run -Dspring-boot.run.arguments--server.port9080./mvnw spring-boot:run -Dspring-boot.run.arguments--server.port9081一旦启动并运行了所有实例在前端重试尝试这些尝试会使用Multiplication实例1的服务但事件消费在两个Gamification副本之间平衡。检查日志可以验证Gamification实例如何处理事件的。另外由于数据库在各个实例之间共享因此前端可以从运行的端口8081的实例请求排行榜此实例将聚合所有副本存储的得分和徽章。如图所示 #mermaid-svg-nO7O3anNlyFizivD {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-nO7O3anNlyFizivD .error-icon{fill:#552222;}#mermaid-svg-nO7O3anNlyFizivD .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-nO7O3anNlyFizivD .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-nO7O3anNlyFizivD .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-nO7O3anNlyFizivD .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-nO7O3anNlyFizivD .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-nO7O3anNlyFizivD .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-nO7O3anNlyFizivD .marker{fill:#333333;stroke:#333333;}#mermaid-svg-nO7O3anNlyFizivD .marker.cross{stroke:#333333;}#mermaid-svg-nO7O3anNlyFizivD svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-nO7O3anNlyFizivD .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-nO7O3anNlyFizivD .cluster-label text{fill:#333;}#mermaid-svg-nO7O3anNlyFizivD .cluster-label span{color:#333;}#mermaid-svg-nO7O3anNlyFizivD .label text,#mermaid-svg-nO7O3anNlyFizivD span{fill:#333;color:#333;}#mermaid-svg-nO7O3anNlyFizivD .node rect,#mermaid-svg-nO7O3anNlyFizivD .node circle,#mermaid-svg-nO7O3anNlyFizivD .node ellipse,#mermaid-svg-nO7O3anNlyFizivD .node polygon,#mermaid-svg-nO7O3anNlyFizivD .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-nO7O3anNlyFizivD .node .label{text-align:center;}#mermaid-svg-nO7O3anNlyFizivD .node.clickable{cursor:pointer;}#mermaid-svg-nO7O3anNlyFizivD .arrowheadPath{fill:#333333;}#mermaid-svg-nO7O3anNlyFizivD .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-nO7O3anNlyFizivD .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-nO7O3anNlyFizivD .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-nO7O3anNlyFizivD .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-nO7O3anNlyFizivD .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-nO7O3anNlyFizivD .cluster text{fill:#333;}#mermaid-svg-nO7O3anNlyFizivD .cluster span{color:#333;}#mermaid-svg-nO7O3anNlyFizivD div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-nO7O3anNlyFizivD :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 发送尝试/获取用户别名8080 发送尝试/获取用户别名9080 获取排行榜8081 获取排行榜9081 发送尝试 浏览器 Multiplication实例1 Multiplication实例2 Multiplication Gamification实例1 Gamification实例2 Gamification 尝试主题交换 Gamification队列 HTTP命令行 还可以验证是否有多个发布者一起使用命令行将正确的尝试发送到Multiplication微服务的第二个实例使用命令行向位于端口9080的实例发送调用并检查可以看到消息在各个订阅者之间是平衡的命令行如下 http POST :8080/attempts factorA50 factorB60 userAliasnoise6 guess3000现在通过消息代理可以实现良好的系统可伸缩性在多个订阅者之间实现负载均衡也提高了系统的弹性。停止一个Gamification实例代理会自动将消息定向到另一个实例。经过测试可以发现整个系统仍然能够正常工作。这里的问题是前端无法进行负载均衡或检测到一个副本已关闭。
小结
文章介绍了事件驱动架构的实现过程了解了如何通过消息代理实现微服务之间的松耦合事件模式不针对特定目标仅表示在特定领域中发生的事实通过对不同的消息类型进行建模从而实现松耦合结合RabbitMQ和AMQP提供的方案实现消息发布和交换、订阅消息的队列和路由绑定配置相关参数适应功能性和非功能性需求。使用Spring Boot实现AMQP和RabbitMQ的集成很容易构建事件驱动架构的应用程序。通过示例了解事件驱动架构的使用场景实现了良好的系统可伸缩性很容易在多个订阅者之间实现负载均衡也提高了系统的弹性。不同的工具可能有不同的利弊应根据需要来选择。
前面的文章 1、1 一个测试驱动的Spring Boot应用程序开发 2、2 使用React构造前端应用 3、3 试驱动的Spring Boot应用程序开发数据层示例 4、4 向微服务架构转变
后面的文章 6、6 使用网关模式进行负载均衡