苏州住房建设局网站,网站专业性免费评价工具,茂名专业网站制作公司,自己做网站制作需要多少钱目录
基本介绍
适用场景
springboot代码演示
演示架构
工程概述
RabbitConfig配置类#xff1a;创建队列及交换机并进行绑定
MessageService业务类#xff1a;发送消息及接收消息
主启动类RabbitMq01Application#xff1a;实现ApplicationRunner接口 基本介绍
Fa…目录
基本介绍
适用场景
springboot代码演示
演示架构
工程概述
RabbitConfig配置类创建队列及交换机并进行绑定
MessageService业务类发送消息及接收消息
主启动类RabbitMq01Application实现ApplicationRunner接口 基本介绍
Fanout Exchange交换机当一个Msg发送到扇形交换机X上时则扇形交换机X会将消息分别发送给所有绑定到X上的消息队列。扇形交换机将消息路由给绑定到自身的所有消息队列也就是说路由键在扇形交换机里没有作用故消息队列绑定扇形交换机时路由键可为空。
扇形交换机将消息路由给绑定到他身上的所有队列给不理会绑定的路由键。某个扇形交换机上当有消息发送到该扇形交换机上时交换机会将消息的拷贝分别发送给这所有与之绑定的队列中。 Fanout交换机转发消息是最快的Fanout Exchange交换机可以简单的理解为广播站。 适用场景
适用于广播消息的场景群聊功能广播消息给当前群聊中的所有人大型多人在线游戏的游戏积分排行榜更新体育新闻客户端实时更新分数分布式系统可以广播各种状态和配置更新
springboot代码演示
演示架构 生产者发送消息道fanout交换机上面队列A和队列B绑定一个fanout交换机消费则对队列A和队列B进行消费 工程概述
工程采用springboot架构主要用到的依赖为
!-- rabbit的依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
application.yml配置文件如下
server:port: 8080
spring:rabbitmq:host: 123.249.70.148port: 5673username: adminpassword: 123456virtual-host: /RabbitConfig配置类创建队列及交换机并进行绑定
创建 RabbitConfig类这是一个配置类
Configuration
public class RabbitConfig {}
定义交换机 Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(exchange.fanout);}
定义队列 Beanpublic Queue queueA(){return new Queue(queue.fanout.a);}Beanpublic Queue queueB(){return new Queue(queue.fanout.b);}
绑定交换机和队列 Beanpublic Binding bindingA(FanoutExchange fanoutExchange,Queue queueA){return BindingBuilder.bind(queueA).to(fanoutExchange);}Beanpublic Binding bindingB(FanoutExchange fanoutExchange,Queue queueB){return BindingBuilder.bind(queueB).to(fanoutExchange);}
MessageService业务类发送消息及接收消息
Component
Slf4j
public class MessageService {Resourceprivate RabbitTemplate rabbitTemplate;} 发送消息方法 Resourceprivate RabbitTemplate rabbitTemplate;public void sendMsg(){String msghello world;Message messagenew Message(msg.getBytes(StandardCharsets.UTF_8));rabbitTemplate.convertAndSend(exchange.fanout,,message);log.info(消息发送完毕....);} MessageConvert 涉及网络传输的应用序列化不可避免发送端以某种规则将消息转成 byte 数组进行发送接收端则以约定的规则进行 byte[] 数组的解析RabbitMQ 的序列化是指 Message 的 body 属性即我们真正需要传输的内容RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化其实现有 SimpleMessageConverter默认、Jackson2JsonMessageConverter 等 接受消息 RabbitListener(queues {queue.fanout.a,queue.fanout.b})public void receiveMsg(Message message){byte[] body message.getBody();String msgnew String(body);log.info(接收到消息:msg);} Message 在消息传递的过程中实际上传递的对象为 org.springframework.amqp.core.Message 它主要由两部分组成 MessageProperties // 消息属性 byte[] body // 消息内容 RabbitListener 使用 RabbitListener 注解标记方法当监听到队列 debug 中有消息时则会进行接收并处理 消息处理方法参数是由 MessageConverter 转化若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory 消息的 content_type 属性表示消息 body 数据以什么数据格式存储接收消息除了使用 Message 对象接收消息包含消息属性等信息之外还可直接使用对应类型接收消息 body 内容但若方法参数类型不正确会抛异常 application/octet-stream二进制字节数组存储使用 byte[]application/x-java-serialized-objectjava 对象序列化格式存储使用 Object、相应类型反序列化时类型应该同包同名否者会抛出找不到类异常text/plain文本数据类型存储使用 Stringapplication/jsonJSON 格式使用 Object、相应类型 主启动类RabbitMq01Application实现ApplicationRunner接口
/*** author 风轻云淡*/
SpringBootApplication
public class RabbitMq01Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(RabbitMq01Application.class, args);}Resourceprivate MessageService messageService;/*** 程序一启动就会调用该方法* param args* throws Exception*/Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
} 在SpringBoot中提供了一个接口ApplicationRunner。 该接口中只有一个run方法他执行的时机是spring容器启动完成之后就会紧接着执行这个接口实现类的run方法。 由于该方法是在容器启动完成之后才执行的所以这里可以从spring容器中拿到其他已经注入的bean。 启动主启动类后查看控制台
2023-09-26 10:46:35.975 INFO 24900 --- [ main]
c.e.rabbitmq01.service.MessageService : 消息发送完毕....
2023-09-26 10:46:36.020 INFO 24900 --- [ntContainer#0-1]
c.e.rabbitmq01.service.MessageService : 接收到消息:hello world
2023-09-26 10:46:36.020 INFO 24900 --- [ntContainer#0-1]
c.e.rabbitmq01.service.MessageService : 接收到消息:hello world