当前位置: 首页 > news >正文

医院网站建设要素做网站需要平台

医院网站建设要素,做网站需要平台,专业的企业进销存软件比较好,深圳市网站建设科技公司一、前言 接下来是开展一系列的 SpringCloud 的学习之旅#xff0c;从传统的模块之间调用#xff0c;一步步的升级为 SpringCloud 模块之间的调用#xff0c;此篇文章为第九篇#xff0c;即介绍 Stream 消息驱动。 二、消息驱动概念 2.1 消息驱动是什么 官方定义 Spring …一、前言 接下来是开展一系列的 SpringCloud 的学习之旅从传统的模块之间调用一步步的升级为 SpringCloud 模块之间的调用此篇文章为第九篇即介绍 Stream 消息驱动。 二、消息驱动概念 2.1 消息驱动是什么 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。屏蔽底层消息中间件的差异降低切换成本统一消息的编程模型。 说的通俗一点就是以前你的项目中可能既用到了 rabbitmq又用到了 kafka现在只需要使用一个 Spring Cloud Stream 就可以了。 2.2 官网 官网的地址在这也可以访问它的中文指导手册但是需要注意的是目前仅支持 RabbitMQ 和 Kafka。 2.3 设计思想 2.3.1 标准 mq 对于标准的 mq 来说架构如下图生产者/消费者之间靠消息媒介传递信息内容消息必须走特定的通道 2.3.2 Stream 比方说我们用到了 RabbitMQ 和 Kafka由于这两个消息中间件的架构上的不同像 RabbitMQ 有 exchangekafka 有 Topic 和 Partitions 分区。 这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰我们如果用了两个消息队列的其中一种后面的业务需求我想往另外一种消息队列进行迁移这时候无疑就是一个灾难性的一大堆东西都要重新推倒重新做因为它跟我们的系统耦合了这时候 springcloud Stream 给我们提供了一种解耦合的方式。 2.3.3 Stream 如何实现统一底层差异 Stream 通过定义绑定器 Binder 作为中间层实现了应用程序与消息中间件细节之间的隔离。 在没有绑定器这个概念的之前我们的 SpringBoot 应用要直接与消息中间件进行信息交互的时候由于各消息中间件构建的初衷不同它们的实现细节上会有较大的差异性通过定义绑定器作为中间层完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的 Channel 通道使得应用程序不需要再考虑各种不同的消息中间件实现。 2.3.4 绑定器 Binder 通过定义绑定器 Binder 作为中间层实现了应用程序与消息中间件细节之间的隔离。 Binder 可以生成 BindingBinding 用来绑定消息容器的生产者和消费者它有两种类型 INPUT 和 OUTPUTINPUT 对应于消费者OUTPUT 对应于生产者。 Stream 中的消息通信方式遵循了发布-订阅模式在 RabbitMQ 就是 Exchange在 Kakfa 中就是 Topic。 2.4 Stream 标准流程套路 1、Binder很方便的连接中间件屏蔽差异 2、Channel通道是队列 Queue 的一种抽象在消息通讯系统中就是实现存储和转发的媒介通过 Channel 对队列进行配置。 3、Source 和 Sink简单的可理解为参照对象是 Spring Cloud Stream 自身从 Stream 发布消息就是输出接受消息就是输入。 2.5 编码 API 和常用注解 组成说明Middleware中间件目前只支持 Rabbitmq 和 KafkaBinderBinder 是应用与消息中间件之间的封装目前实现了 Kafka 和 Rabbitmq 的 Binder通过 Binder可以很方便的连接中间件可以动态的改变消息类型对应于 Kafka 的 topicRabbitmq 的 exchange这些都可以通过配置文件来实现Input注解标识输入通道通过该输入通道接收到的消息进入应用程序Output注解标识输出通道发布的消息将通过该通道离开应用程序StreamListener监听队列用于消费者的队列的消息的接收EnableBinding指信道 channel 和 exchange 绑定在一起 三、消息驱动之生产者 3.1 工程创建 新建一个 cloud-stream-rabbitmq-consumer8801 模块用来充当消息的生产者pom.xml 内容如下所示只有一个依赖比较特殊。 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdcom.springcloud/groupIdartifactIdSpringCloud/artifactIdversion1.0-SNAPSHOT/version/parentartifactIdcloud-stream-rabbitmq-provider8801/artifactIdpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-netflix-eureka-client/artifactId/dependency!--引入stream整合rabbit的依赖如果是kafka则也引入对于的依赖即可--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependency!--基础配置--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-devtools/artifactIdscoperuntime/scopeoptionaltrue/optional/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies/project application.yml 内容如下所示比较特殊的是引入了 stream 的相关配置binders 标签指定继承了哪种消息中间件并配置其连接地址、用户名和密码等。bindings 标签表示要对哪些服务进行整合output 表示这是一个消息的生产者destination 表示的是 rabbitmq 里面的交换机名称binder 绑定的是上面指定的消息中间件。 server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型本次为json文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔默认是30秒lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔默认是90秒instance-id: send-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址 主启动类的代码如下 SpringBootApplication public class StreamMQMain8801 {public static void main(String[] args){SpringApplication.run(StreamMQMain8801.class,args);} } 创建发送消息 Service 接口和其实现类代码如下 package com.springcloud.service;public interface IMessageProvider {public String send() ; } package com.springcloud.service.impl;import com.springcloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel;import javax.annotation.Resource; import java.util.UUID;// 第一步可以理解为是一个消息的发送管道的定义消息的生产者用 source消息的消费者用 sink EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider {Resourceprivate MessageChannel output; // 第二步引入消息的发送管道Overridepublic String send(){String serial UUID.randomUUID().toString();// 第三步调用 send 方法发送消息this.output.send(MessageBuilder.withPayload(serial).build()); // 创建并发送消息System.out.println(***serial: serial);return serial;} }创建 controller 类用于远程调用发送消息代码如下 RestController public class SendMessageController {Resourceprivate IMessageProvider messageProvider;GetMapping(value /sendMessage)public String sendMessage(){return messageProvider.send();} } 3.2 测试 启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801 模块在浏览器输入 http://localhost:8801/sendMessage 进行测试多刷新几次浏览器。如下图 可以看到消息成功的被 rabbitmq 接收了。 四、消息驱动之消费者 4.1 工程创建 新建一个 cloud-stream-rabbitmq-consumer8802 模块用来充当消息的消费者pom.xml 内容如下所示只有一个依赖比较特殊。 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdcom.springcloud/groupIdartifactIdSpringCloud/artifactIdversion1.0-SNAPSHOT/version/parentartifactIdcloud-stream-rabbitmq-consumer8802/artifactIdpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-netflix-eureka-client/artifactId/dependency!--引入stream整合rabbit的依赖如果是kafka则也引入对于的依赖即可--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency!--基础配置--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-devtools/artifactIdscoperuntime/scopeoptionaltrue/optional/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies/project application.yml 内容如下所示比较特殊的是引入了 stream 的相关配置binders 标签指定继承了哪种消息中间件并配置其连接地址、用户名和密码等。bindings 标签表示要对哪些服务进行整合input 表示这是一个消息的消费者destination 表示的是 rabbitmq 里面的交换机名称binder 绑定的是上面指定的消息中间件。 server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型本次为对象json如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔默认是30秒lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔默认是90秒instance-id: receive-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址 主启动类的代码如下 SpringBootApplication public class StreamMQMain8802 {public static void main(String[] args){SpringApplication.run(StreamMQMain8802.class,args);} } 创建 listenr 类用于监听和接收消息代码如下 Component // 第一步可以理解为是一个消息的发送管道的定义消息的生产者用 source消息的消费者用 sink EnableBinding(Sink.class) public class ReceiveMessageListener {Value(${server.port})private String serverPort;// 第二步使用 StreamListener 注解进行消息的监听和接收StreamListener(Sink.INPUT)public void input(MessageString message){System.out.println(消费者1号-------接收到的消息 message.getPayload()\t port: serverPort);} } 4.2 测试 接下来测试使用 8801 发送消息看 8802 是否可以正常的接收消息启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801 和 cloud-stream-rabbitmq-consumer8802 模块 在浏览器输入 http://localhost:8801/sendMessage 进行测试 可以看到 8081 发送的消息可以被 8082 正常的消费如下图 五、分组消费与持久化 5.1 工程创建 参照 cloud-stream-rabbitmq-consumer8802 模块我们重新创建一个消费者模块  cloud-stream-rabbitmq-consumer8803可能有点不一样的就是 application.yml我把它的内容粘出来如下 server:port: 8803spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型本次为对象json如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔默认是30秒lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔默认是90秒instance-id: receive-8803.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址 5.2 测试 启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801、cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 模块 在浏览器输入 http://localhost:8801/sendMessage 进行测试 可以看到 8081 发送的消息可以既被 8082 又被 8083 消费了如下图 5.3 重复消费问题 目前是生产者发送消息8802 和 8803 同时都收到了存在重复消费问题。 比如在如下场景中订单系统我们做集群部署都会从 RabbitMQ 中获取订单信息那如果一个订单同时被两个服务获取到那么就会造成数据错误我们得避免这种情况。这时我们就可以使用 Stream 中的消息分组来解决。 注意在 Stream 中处于同一个 group 中的多个消费者是竞争关系就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的重复消费同一组内会发生竞争关系只有其中一个可以消费。 5.4 分组 5.4.1 原理 微服务应用放置于同一个 group 中就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的同一个组内会发生竞争关系只有其中一个可以消费。 5.4.2 不同组测试 我们先把 8802 和 8803 都变成不同组进行测试首先修改 8802 的 application.yml添加一个 group 属性如下图 修改 8803 的 application.yml添加一个 group 属性如下图 启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801、cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 模块 在浏览器输入 http://localhost:8801/sendMessage 进行测试 可以得出结论不同组是可以全面消费的重复消费。 5.4.3 同组测试 使用 8802 和 8803 进行同组进行测试在 8802 和 8803 的 application.yml 中设置相同的 group 属性都属于 tansunA 组如下图 启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801、cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 模块 在浏览器输入 http://localhost:8801/sendMessage 进行测试 可以得出结论同一个组的多个微服务实例每次只会有一个拿到。 5.5 持久化 停止 8802 和 8803 并将 8802 的 group 属性去除掉然后让 8801 先发送 4 条消息到 rabbitmq。 先启动 8802无分组属性配置后台没有打出来消息如下图 再启动 8803有分组属性配置后台打出来了 MQ 上的消息 可以得出结论只要你有分组的属性你的数据就不会丢失。
http://www.zqtcl.cn/news/945313/

相关文章:

  • 用asp.net和access做的关于校园二手网站的论文网站环境搭建好后怎么做网站
  • 如何查网站的外链哈尔滨微信网站开发
  • 洛阳设计网站公司建设银行网站 购买外汇
  • 做视频网站的备案要求吗给工厂做代加工
  • 网站建设技术外包西安推荐企业网站制作平台
  • 建立一个做笔记的网站石家庄网站优化
  • 服务器创建多个网站吗中铁雄安建设有限公司网站
  • 建湖建网站的公司网站建设人工费
  • 沈阳公司网站设计公司怎么投放广告
  • 上海哪家做网站关键词排名如何做简洁网站设计
  • 网站维护的内容seo网站关键词优化哪家好
  • 东阳市网站建设西安做网站选哪家公司
  • 宁津网站开发万能应用商店下载
  • 专业制作标书网站地图优化
  • 广州建网站兴田德润团队什么是网络营销详细点
  • win7建网站教程wordpress chrome插件开发
  • 免费行情软件网站下载视频公司介绍ppt制作模板
  • wordpress快速建站wordpress短代码可视化
  • 餐饮型网站开发比较好看的网页设计
  • 网站管理包括潍坊网站建设优化
  • 南开集团网站建设网站服务器搭建
  • 网络的最基本定义泰安seo网络公司
  • 国外比较好的资源网站请人做外贸网站应注意什么问题
  • 人网站设计与制作什么是销售型网站
  • 最简单网站开发软件有哪些企业电子商务网站建设问题
  • 玉林网站制作简单的网站制作代码
  • 滨州建设厅网站长沙好的做网站品牌
  • 教务系统网站建设模板下载为网站开发
  • 成都市建设招标网站加载wordpress外部文件
  • 网站做兼容处理怎么浙江seo博客