网站推广主要怎么做,搜题公众号怎么制作,百度文库个人登录,如何自己搭建一个物联网平台什么是RabbitMq?
RabbitMQ是一个开源的消息队列中间件#xff0c;它实现了高级消息队列协议#xff08;AMQP#xff09;。它被广泛用于分布式系统中的消息传递和异步通信。RabbitMQ提供了一种可靠的、可扩展的机制来传递消息#xff0c;使不同的应用程序能够相互之间进行…什么是RabbitMq?
RabbitMQ是一个开源的消息队列中间件它实现了高级消息队列协议AMQP。它被广泛用于分布式系统中的消息传递和异步通信。RabbitMQ提供了一种可靠的、可扩展的机制来传递消息使不同的应用程序能够相互之间进行通信。它支持多种编程语言和平台并且具有灵活的路由和队列配置选项。
同步调用
同步调用的优点 时效性较强可以立即得到结果
同步调用的问题 耦合度高 性能和吞吐能力下降 有额外的资源消耗 有级联失败问题
异步调用 好处 吞吐量提升无需等待订阅者处理完成响应更快速 故障隔离服务没有直接调用不存在级联失败问题 调用间没有阻塞不会造成无效的资源占用 耦合度极低每个服务都可以灵活插拔可替换 流量削峰不管发布事件的流量波动多大都由Broker接收订阅者可以按照自己的速度去处理事件
缺点 架构复杂了业务没有明显的流程线不好管理 需要依赖于Broker的可靠、安全、性能
MQ的种类 RabbitMq安装和使用 云服务器安装Rabbitmq。 在docker 中拉去Ribbitmq镜像。
在docker 中运行ribbitmq。
docker run -d -p 5672:5672 -p 15672:15672 -p 25672:25672 --name rabbitmq rabbitmq 查看rabbitmq的状态。
rabbitmqctl status 接着我们还可以将Rabbitmq的管理面板开启这样就可以在浏览器上进行实时访问和监控了。
我们需要先进入rabbitmq容器。
docker exec -it [在docker中对应的ID] [进入容器的路径] #路径一般为/bin/bash
开启rabbitmq的控制面板设置。
rabbitmq-plugins enable rabbitmq_management
打开rabbitmq的控制面板就是对应的控制面板端口为15672。 账号和密码都是:guest 消息队列模型 SpringAMQP 什么是springAMQP?
Spring AMQP 是一个基于 Spring 框架的 AMQP高级消息队列协议的开发框架。它提供了一种简化和抽象化的方式来使用 AMQP使得在应用程序中使用消息队列变得更加容易。
springAMQP的使用
导入依赖
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
编写发送者
编写applcation.yml文件
spring:rabbitmq:host: 119.9.212.171 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: guest # 用户名password: guest # 密码
进行测试
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.io.IOException;
import java.util.concurrent.TimeoutException;RunWith(SpringRunner.class) #如果不加此注解spring容器无法自动注入RabbitTemplate
SpringBootTest
public class PublisherTest {AutowiredRabbitTemplate rabbitTemplate;Testpublic void tess1() {String queueName queueName;String message hello, tolen;rabbitTemplate.convertAndSend(queueName, message);}
}测试结果为下: 可能会出现没有队列生成的情况这是因为Test无法自动一个 queue我们手动创建一个即可。
编写消费者
编辑application.yml文件
spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: test # 用户名password: 123456 # 密码
创建消息监听者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class RabbitMqListener {RabbitListener(queues queueName)public void getMessage(String message) {System.out.println(获取的消息是: message);}
}直接配置即可在后续的项目中消费者会监听对应的消息进行操作。
WorkQueue
我们可以对一个消息标签设置多个监听者并且默认的设置是预取,也就是即使服务模块处理能力差的情况也会分配到相同个数的信息不能达到能者多劳的效果为了到达此效果我们可以在application.yml中进行设置。
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息
发布与订阅 FanoutExchange的使用
在消费者模块编写新建交换机新建队列交换机和队列绑定操作。
在配置类中完成上述操作
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class MQConfiguration {//声明交换机FanoutExchangeBeanpublic FanoutExchange fanoutExchange() {
// 设置交换机的名字return new FanoutExchange(tolen.fanout);}
// 创建一个信息队列1Beanpublic Queue fanoutQueue1() {return new Queue(fanout.queue1);}
// 创建信息队列2Beanpublic Queue fanoutQueue2() {return new Queue(fanout.queue2);}//将交换机和队列1进行绑定Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {//绑定队列给对应的交换机return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//将交换机和队列2进行绑定Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
在消费者模块中创建两个队列的监听器
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class RabbitMqListener {RabbitListener(queues fanout.queue1)public void getMessage1(String message) {System.out.println(消息队列1中获取的消息是: message);}RabbitListener(queues fanout.queue2)public void getMessage2(String message) {System.out.println(消息队列2中获取的消息是: message);}}
接下来不信消息发送模块,这里需要注意的是此时我们是向对应的交换机发送消息通过交换机发送消息给两个消息队列。
发送消息的代码为下
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.io.IOException;
import java.util.concurrent.TimeoutException;RunWith(SpringRunner.class)
SpringBootTest
public class PublisherTest {AutowiredRabbitTemplate rabbitTemplate;Testpublic void tess1() {String queueName queueName;String message hello, tolen;rabbitTemplate.convertAndSend(queueName, message);}Testpublic void fanoutTest() {String exchangeName tolen.fanout;String message hi, tolen!;//routingKey不进行设置rabbitTemplate.convertAndSend(exchangeName, , message);}
}如果不设置routingKey的话就会默认将消息发送到使用绑定的消息队列上。
测试结果为下:
交换机状态
监听器接收到的消息 DirectExchange
可以设置routingKey交换机可以向指定的队列发送消息。
配置监听器
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class RabbitMqListener {//使用注解进行绑定, 不再需要configuration配置RabbitListener(bindings QueueBinding(value Queue(name directQueue1),exchange Exchange(name direct), //默认使用的交换机类型就是directExchangekey {red, blue}))public void directQueue1(String message) {System.out.println(directQueue2: message);}//使用注解进行绑定, 不再需要configuration配置RabbitListener(bindings QueueBinding(value Queue(name directQueue2),exchange Exchange(name direct), //默认使用的交换机类型就是directExchangekey {red}))public void directQueue2(String message) {System.out.println(directQueue2: message);}
}编写消息发布模块
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class)
SpringBootTest
public class PublisherTest {AutowiredRabbitTemplate rabbitTemplate;Testpublic void fanoutTest() {String exchangeName direct;String message hi, tolen!;//设置routingKeyrabbitTemplate.convertAndSend(exchangeName, blue, message);}
}测试结果为下 此时就只有routingKeyblue的监听器才会接收到消息。
TopicExchage
Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 item.insert
通配符规则
#匹配一个或多个词
*匹配不多不少恰好1个词
修改编写监听器的配置
//使用注解进行绑定, 不再需要configuration配置RabbitListener(bindings QueueBinding(value Queue(name directQueue2),exchange Exchange(name direct, type ExchangeTypes.TOPIC), //默认使用的交换机类型就是directExchangekey {#.new}))public void directQueue2(String message) {System.out.println(directQueue2: message);}
只要发送的消息中的routingKey中尾部为新闻的消息全部会被监听。routingKey使用.作间隔
消息转换器
在springboot中默认使用JDK的序列化为了提高使用性我们可以使用json转换器。
在消费者和发送者中都导入对应的依赖。
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency
在configuration中配置信息转换器。消费者和发布者都需要配置
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class MQConfiguration {Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}进行测试在发送一个对象类型的消息。
对应的监听器
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.Objects;Component
public class RabbitMqListener {//使用注解进行绑定, 不再需要configuration配置RabbitListener(bindings QueueBinding(value Queue(name directQueue2),exchange Exchange(name direct), //默认使用的交换机类型就是directExchangekey {blue}))public void directQueue2(MapString, String message) {System.out.println(directQueue2: message);}
}
对应的发送代码
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.LinkedHashMap;
import java.util.Map;RunWith(SpringRunner.class)
SpringBootTest
public class PublisherTest {AutowiredRabbitTemplate rabbitTemplate;Testpublic void fanoutTest() {String exchangeName direct;MapString, String message new LinkedHashMap();message.put(name, tolen);message.put(age, 19);//设置routingKeyrabbitTemplate.convertAndSend(exchangeName, blue, message);}
}测试效果为下 接收到的数据 。 消息队列中的数据。