网站做商业计划书吗,天津网站推广优化,wordpress 分类 文章前,芜湖龙湖建设工程有限公司网站文章目录 前言一、SpringCloud 集成 RocketMQ1. pom 依赖2. yml 配置3. 操作实体4. 生产消息4.1. 自动发送消息4.2. 手动发送消息 5. 消费消息 二、配置解析1. spring.cloud.stream.function.definition 前言 定义 Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力… 文章目录 前言一、SpringCloud 集成 RocketMQ1. pom 依赖2. yml 配置3. 操作实体4. 生产消息4.1. 自动发送消息4.2. 手动发送消息 5. 消费消息 二、配置解析1. spring.cloud.stream.function.definition 前言 定义 Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现并引入了发布-订阅、消费组、分区这三个核心概念。简单的说Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration实现了一套轻量级的消息驱动的微服务框架。 抽象模型 我们都知道市面上有很多消息中间件Sping Cloud Stream 为了可以集成各种各样的中间件它抽象出了 Binder 的概念,每个消息中间件都需要有对应自己的 Binder。这样它就可以根据不同的 Binder 集成不同的中间件。下图的input和output是channelBinder则是消息中间件和通道之间的桥梁。 绑定器 通过使用 Spring Cloud Stream可以有效简化开发人员对消息中间件的使用复杂度让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。 Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互)我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。
Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念实现了应用程序和消息中间件之间的隔离同时我们也可以通过应用程序实现消息中间件之间的通信。在我们的项目的可以继承多种绑定器我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口。
一、SpringCloud 集成 RocketMQ
1. pom 依赖
!-- rocketmq --
dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-stream-rocketmq/artifactId
/dependency2. yml 配置
spring:cloud:stream:function:definition: producer1;consumer1 # 方法定义用于定义发送者或消费者方法# 配置消息通道通用属性适用于所有消息中间件bindings:# 配置channel消息通道consumer1-in-0:destination: consumer_topic # topic消息主题content-type: application/json # 内容格式group: consumer-group # 消费者组producer1-out-0:destination: producer_topic # topic消息主题content-type: application/json # 内容格式rocketmq:binder:name-server: 127.0.0.1:9876 # rocketmq服务地址vipChannelEnabled: true # 是否开启vip通道兼容老版本使用。多监听一个端口用于接受处理消息防止端口占用。# 配置消息通道独特属性仅适用于rocketmqbindings:# 配置channel消息通道生产者[functionName]-out-[index]消费者[functionName]-in-[index]producer1-out-0:producer:group: consumer-groupsync: true # 是否开启同步发送consumer1-in-0: consumer:subscription: myTag # 消费tagdelayLevelWhenNextConsume: -1suspendCurrentQueueTimeMillis: 99999999broadcasting: false # 是否使用广播消费默认为false使用集群消费3. 操作实体
package com.demo.model;import lombok.AllArgsConstructor;
import lombok.Data;/*** 消息model*/
Data
AllArgsConstructor
public class MsgModel {/*** 消息id*/private String msgId;/*** 消息内容*/private String message;
}4. 生产消息
4.1. 自动发送消息
通过 MessageBuilder 自动发送消息。
package com.demo;import com.demo.model.MsgModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.function.Supplier;/*** 消息生产者类*/
Configuration
Slf4j
public class MyProducer {/*** 消息生产者1*/Beanpublic SupplierMessageMsgModel producer1() {return () - {MsgModel msgModel new MsgModel(System.currentTimeMillis(), 测试消息);log.info(producer1发送消息 msgModel);return MessageBuilder.withPayload(entity).build();};}
}这种方式定义 suppelier 会 默认1000ms 发送一次记录。可以修改 spring.cloud.stream.poller.fixedDelay 设置延迟毫秒值。
4.2. 手动发送消息
通过 StreamBridge 手动发送消息。
package com.demo.controller;import com.demo.model.MsgModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** 消息controller*/
RestController
RequiredArgsConstructor
RequestMapping(/msg)
Slf4j
public class MsgController {private final StreamBridge streamBridge;/*** 发送消息*/GetMapping(/send)public void sendMsg() {MsgModel msgModel new MsgModel(System.currentTimeMillis(), 测试消息);log.info(producer1发送消息 msgModel);streamBridge.send(producer1-out-0, MessageBuilder.withPayload(entity).setHeader(MyHearder, 这是一个请求头).build());}
}5. 消费消息
package com.demo;import com.demo.model.MsgModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;import java.util.function.Consumer;/*** 消息消费者类*/
Configuration
Slf4j
public class ReceiveMQ {/*** 消息消费者1*/Beanpublic ConsumerMessageMsgModel consumer1(){return (message)-{MessageHeaders headers message.getHeaders();MsgModel msgModel message.getPayload();log.info(consumer1接收消息消息头 headers.get(MyHeader));log.info(consumer1接收消息消息内容 msgModel);};}
}二、配置解析
1. spring.cloud.stream.function.definition
进行生产者或消费者方法定义在 rocketmq 初始时会加载这些方法以创建生产者或消费者列表。
不管是创建 Consumer 还是 Supplier 或者是 Function Stream 都会将其方法名称进行一个 topic 拆封和绑定。假设创建了一个 Consumer String myTopic 的方法Stream 会将其 拆分成 In 和 out 两个通道
输入通道消费者 [functionName]-in-[index] consumer1-in-0输出通道生产者 [functionName]-out-[index] producer1-out-0
注意这里的 functionName 需要和生产者或消费者方法名称以及 spring.cloud.stream.function.definition 下的名称保持一致。