奥派网站建设,网页图片保存,灰色行业推广平台网站,wordpress 加速会一 MQ 的基本概念
1 MQ概述
MQ全称 Message Queue#xff08;消息队列#xff09;#xff0c;是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。消息队列就是所谓的存放消息的队列。 消息队列解决的不是存放消息的队列的⽬的#xff0c;解决的是通信问…一 MQ 的基本概念
1 MQ概述
MQ全称 Message Queue消息队列是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。消息队列就是所谓的存放消息的队列。 消息队列解决的不是存放消息的队列的⽬的解决的是通信问题。 传统方式系统之间直接调用 http协议 httpclient/openFeign 中间件 2 MQ 的优势
异步、 解耦、 削峰
1 应用解耦
系统的耦合性越高容错性就越低可维护性就越低。以购物为例子 使用 MQ 使得应用间解耦提升容错性和可维护性。 2 异步提速
一个下单操作耗时20 300 300 300 920ms用户点击完下单按钮后需要等待920ms才能得到下单响应太慢 用户点击完下单按钮后只需等待25ms就能得到下单响应 (20 5 25ms)。提升用户体验和系统吞吐量单位时间内处理请求的数目。 3 削峰填谷 使用了 MQ 之后限制消费消息的速度为1000这样一来高峰期产生的数据势必会被积压在 MQ 中高峰就被“削”掉了但是因为消息积压在高峰期过后的一段时间内消费消息的速度还是会维持在1000直到消费完积压的消息这就叫做“填谷”。使用MQ后可以提高系统稳定性。 3 MQ 的劣势
1 系统可用性降低
系统引入的外部依赖越多系统稳定性越差。一旦 MQ 宕机就会对业务造成影响。如何保证MQ的高可用
2系统复杂度提高
MQ 的加入大大增加了系统的复杂度以前系统间是同步的远程调用现在是通过 MQ 进行异步调用。如何保证消息不被丢失等情况
4 常见的 MQ 产品 二 RabbitMQ安装
1 上传软件
erlang18.31.el7.centos.x86_64.rpm
socat1.7.3.25.el7.lux.x86_64.rpm
rabbitmqserver3.6.51.noarch.rpm
2 安装Erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
3 安装RabbitMQ
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
4 开启管理界面及配置
rabbitmq-plugins enable rabbitmq_management
5 启动
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
6 登录
需要关闭防火墙 远程服务器开启15672和5672开启
http://192.168.56.140:15672/
如果登录报错 这是因为rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 删除loopback_users 中的 guest 云服务器记得开放15672端口
默认账号和密码都是 guest
三 界面介绍和操作
1 添加用户 # 角色说明 1、 超级管理员(administrator) 可登陆管理控制台可查看所有的信息并且可以对用户策略(policy)进行操作。 2、 监控者(monitoring) 可登陆管理控制台同时可以查看rabbitmq节点的相关信息(进程数内存使用情 况磁盘使用情况等) 3、 策略制定者(policymaker) 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红 框标识的部分)。 4、 普通管理者(management) 仅可登陆管理控制台无法看到节点信息也无法对策略进行管理。 5、 其他 无法登陆管理控制台通常就是普通的生产者和消费者。
2 创建虚拟机
1 点击图中的Virtual Hosts
2 创建虚拟机路径记得要带 / 3 将虚拟机分配给用户 四 RabbitMQ概念
1 架构图 2 相关概念
Publisher - ⽣产者发布消息到RabbitMQ中的Exchange
Consumer - 消费者监听RabbitMQ中的Queue中的消息
Broker接收和分发消息的应用RabbitMQ Server就是 Message Broker也就是我们的RabbitMQ服务器
Virtual host出于多租户和安全因素设计的在RabbitMQ中可以创建出多个虚拟消息服务器VirtualHost。
Connectionpublisherconsumer 和 broker 之间的 TCP 连接
channel-信道 网络信道几乎所有操作都在channel中进行channel是消息读写的通道。客户端可以建立多个channel每个channel表示一个会话任务 信道有特定的功能比如创建交换机创建队列。
Exchange - 交换机和⽣产者建⽴连接并接收⽣产者的消息 并且不能保存消息。
Queue - 队列Exchange会将消息分发到指定的QueueQueue和消费者进⾏交互 队列是可以保存消息的。
Routes - 路由交换机以什么样的策略将消息发布到Queue。生产者发消息的时候可以给消息贴一个标签为了让指定的消费者接收消息。 结构解读
首先安装好的RabbitMQ就是一个Broker如果我们想将MQ给多个用户使用并且互不影响那我们就需要将MQ通过虚拟化的方式分割成多个提供MQ的服务也就是Virtual host每个Virtual host都有独立的路径并且和用户绑定。这样我们就可以在自己的世界里发消息了。 通信解读一条消息到底是怎么从生产者到了消费者的 首先生产者通过连接的方式连接到MQ的一个虚拟机需要知道MQ的ip,端口虚拟机路径用户名和密码准备好了以后就可以建立连接了TCP 连接Connection连接 但是建立和关闭TCP连接是有代价的频繁的建立关闭TCP连接对于系统的性能有很大的影响而且TCP的连接数也有限制这也限制了系统处理高并发的能力。但是在TCP连接中建立Channel是没有上述代价的所以我们使用信道changel的方式发送和接受消息。 消息进入MQ的第一站是Exchange交换机交换机的作用① 接收生产者发送的消息 ②和队列绑定。交换机是不保存信息的。生产者发消息的时候可以指定一个路由键路由键可以理解为就是给消息贴了一个标签(做标记作用消费者接收消息的时候有用) 消息进入第二站queue消费者要接收消息需要一直监听着queue那么消费者在监听queue的时候需要先指定队列要和那个交换机绑定绑定的时候也需要指定路由键如果发消息时的路由键和接收消息时候路由键一样那么这个消息就会进入到这个队列。 最后消费者就拿到消息了。需要说明的一点所有的交换机和队列创建的时候都是需要起名字的。
3 RabbitMQ的通讯
官网介绍RabbitMQ Tutorials — RabbitMQ 主题
五 案例解释
新建maven工程Spring整合MQ。因为MQ中有很多概念在boot中是体会不到的boot屏蔽了很多概念。
1 简单队列模式 1 代码
生产者和消费者都导入maven依赖
dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.10.0/version
/dependency 生产者代码记得最后需要关闭资源。 package com.xinzhi.product;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class MyProduct {//队列名private static final String QUEUE_NAME my_queue;public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接ConnectionFactory connectionFactory new ConnectionFactory();// 2.设置连接地址connectionFactory.setHost(192.168.32.11);// 3.设置端口号:connectionFactory.setPort(5672);// 4.设置账号和密码connectionFactory.setUsername(laohan123);connectionFactory.setPassword(laohan123);// 5.设置VirtualHostconnectionFactory.setVirtualHost(/laohan);Connection connection connectionFactory.newConnection();// 6.获取信道Channel channel connection.createChannel();// 7.创建队列,声明并创建一个队列如果队列已存在则使用这个队列// 7.1第一个参数队列名称// 7.2第二个参数是否持久化false对应不持久化数据MQ停掉数据就会丢失// 7.3第三个参数该队列是否是私有的// 7.4第四个是否自动删除,false代表连接停掉后不自动删除掉这个队列// 7.5队列的其他参数, 一般都是nullchannel.queueDeclare(my_queue, false, false, false, null);String message 欣知大数据;//四个参数//exchange 交换机如果使⽤了表示使⽤了默认交换机默认交换机会隐式绑定到队列//routingKey路由键如果使⽤了默认交换机那么路由键就可以用队列名来代替。//props header信息一般设置null//最后一个参数是要传递的消息字节数组channel.basicPublish(, //使⽤默认交换机my_queue, //因为⽤了默认交换机于是参数就是队列名称null,message.getBytes() 消息内容);channel.close();connection.close();System.out.println(发送成功);}
}消费者代码 package com.xinzhi;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class MyConsumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接ConnectionFactory connectionFactory new ConnectionFactory();// 2.设置连接地址connectionFactory.setHost(192.168.32.11);// 3.设置端口号:connectionFactory.setPort(5672);// 4.设置账号和密码connectionFactory.setUsername(laohan123);connectionFactory.setPassword(laohan123);// 5.设置VirtualHostconnectionFactory.setVirtualHost(/laohan);Connection connection connectionFactory.newConnection();// 6.获取信道Channel channel connection.createChannel();// 7.声明队列channel.queueDeclare(my_queue, false, false, false, null);// 8.创建消费者Consumer consumer new DefaultConsumer(channel) {// consumerTag 消息的唯一标识一般用来确认消息是否被消费// envelope 封装了mq的基本方法// properties 封装了mq的基本属性// body 监听到的消息Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};// 9.消费者监听某个队列 autoAck自动签收channel.basicConsume(my_queue, false, consumer);}
}
2 代码解读
envelope单词 信封的意思在这里是封装了MQ的一些基本方法
- getDeliveryTag() 获取此参数信封中包含的交货标签
- isRedeliver() 如果这是在 ack 失败后是否重新投递
- getExchange()
- getRoutingKey()
3 流程解读
这是RabbitMQ最简单的工作方式 生产者声明好队列然后把信息给了MQ默认的交换机交换机将信息发给队列 消费者也声明好队列然后监听队列获取信息
4 抽出工具类
因为生产者和消费者都是相同的获取信道的方式
public static Connection getConnection(){// 1.创建连接ConnectionFactory connectionFactory new ConnectionFactory();// 2.设置连接地址connectionFactory.setHost(192.168.56.140);// 3.设置端口号:connectionFactory.setPort(5672);// 4.设置账号和密码connectionFactory.setUsername(laohan123);connectionFactory.setPassword(laohan123);// 5.设置VirtualHostconnectionFactory.setVirtualHost(/laohan);Connection connection null;try {connection connectionFactory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}return connection;}
2 work queue 队列模式 能者多劳模式
1 代码 生产者 package com.xinzhi.work.product;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class MyProduct {public static void main(String[] args) throws IOException, TimeoutException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明队列channel.queueDeclare(work_queue, true, false, false, null);//3 发消息(消息先到了默认交换机交换机和队列绑定了所以信息也会直接到了queue)for (int i 1; i 101 ; i) {String message xinzhii;channel.basicPublish(,work_queue,null,message.getBytes());}//4 提示和释放资源System.out.println(发送成功);channel.close();connection.close();}
}消费者 将下面的代码再复制两份MyConsumer1MyConsumer2等待时间设置成100,500 package com.xinzhi.work.consumer;import com.rabbitmq.client.*;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;public class MyConsumer1 {public static void main(String[] args) throws IOException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明队列channel.queueDeclare(work_queue, true, false, false, null);//3 声明消费者一次只接受一条消息channel.basicQos(1);// 4 声明消费者Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消费标签是 consumerTag 消息体是 new String(body));// 消息消费成功以后的唯一标识System.out.println(envelope.getDeliveryTag());// 确认签收当前的一条消息如果是true是签收队列里面所有的消息channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(work_queue, consumer);}
}2 代码解读
在简单模式的基础上添加了多个消费者每个消费者添加了等待时间。
生产者一次往队列里投放多条消息消费者根据能力来消费这里面的所有消息性能强的消费的消息多所以是能者多劳 3 订阅发布
平分秋色 交换机类型 fanout
发布订阅这次使用了交换机之前的两种方式都是没有显式的声明使用交换机之前其实用的系统默认的交换机。
这次使用了交换机但是 没有使用路由键。只要和交换机绑定了的对了都可以接受到消息也就是上图两个队列中可以收到相同的消息。
1 代码 生产者 package com.xinzhi.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Product {public static void main(String[] args) throws IOException, TimeoutException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明交换机和类型channel.exchangeDeclare(fanout_exchange, BuiltinExchangeType.FANOUT);//3 将信息发给交换机for (int i 1; i 101 ; i) {String message laohani;channel.basicPublish(fanout_exchange,,null,message.getBytes());}System.out.println(success);channel.close();connection.createChannel();}
}消费者1 package com.xinzhi.fanout;import com.rabbitmq.client.*;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;public class MyConsumer1 {public static void main(String[] args) throws IOException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明队列channel.queueDeclare(fanout_queue1, true, false, false, null);//3 声明交换机channel.exchangeDeclare(fanout_exchange, BuiltinExchangeType.FANOUT);//4 交换机和队列绑定channel.queueBind(fanout_queue1, fanout_exchange, , null);// 5 声明消费者Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(tag: consumerTag ,message: new String(body));// 消息消费成功以后的唯一标识System.out.println(envelope.getDeliveryTag());// 确认签收当前的一条消息如果是true是签收队列里面所有的消息channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(fanout_queue1, consumer);}
}消费者2 package com.xinzhi.fanout;import com.rabbitmq.client.*;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;public class MyConsumer2 {public static void main(String[] args) throws IOException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明队列channel.queueDeclare(fanout_queue2, true, false, false, null);//3 声明交换机channel.exchangeDeclare(fanout_exchange, BuiltinExchangeType.FANOUT);//4 交换机和队列绑定channel.queueBind(fanout_queue2, fanout_exchange, , null);// 5 声明消费者Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(tag: consumerTag ,message: new String(body));// 消息消费成功以后的唯一标识System.out.println(envelope.getDeliveryTag());// 确认签收当前的一条消息如果是true是签收队列里面所有的消息channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(fanout_queue2, consumer);}
}4 .路由 routing
暗送秋波
1 概念
交换机direct 在⽣产者发送消息时指明routing-key 在消费者声明队列和交换机的绑定关系时指明routing-key 解决的问题是 因为交换机和两个队列都绑定了但是为了给队列里发送的消息不一样也就是区分给那个队列发什么样 的消息就有了routing key的概念。发消息的时候指定一下路由键接收消息的时候队列要和交换机绑定这时候也需要指定路由键如果这两次的路由键一样那么这个消息就放着这个队列里面
2 代码 生产者 package com.xinzhi.direct;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Product {public static void main(String[] args) throws IOException, TimeoutException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明交换机和类型,并且持久化channel.exchangeDeclare(direct_exchange, BuiltinExchangeType.DIRECT,true);//3 将信息发给交换机并且指定路由键String message1 laohan1;String message2 laohan2;channel.basicPublish(direct_exchange,han,null,message1.getBytes());channel.basicPublish(direct_exchange,man,null,message2.getBytes());System.out.println(success);channel.close();connection.close();}
}消费者1 package com.xinzhi.direct;import com.rabbitmq.client.*;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;public class MyConsumer1 {public static void main(String[] args) throws IOException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明队列channel.queueDeclare(direct_queue1, true, false, false, null);//3 声明交换机channel.exchangeDeclare(direct_exchange, BuiltinExchangeType.DIRECT,true);//4 交换机和队列绑定channel.queueBind(direct_queue1, direct_exchange, han);// 5 声明消费者Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(tag: consumerTag ,message: new String(body));// 消息消费成功以后的唯一标识System.out.println(envelope.getDeliveryTag());// 确认签收当前的一条消息如果是true是签收队列里面所有的消息channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(direct_queue1, consumer);}
}消费者2 package com.xinzhi.direct;import com.rabbitmq.client.*;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;public class MyConsumer2 {public static void main(String[] args) throws IOException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明队列channel.queueDeclare(direct_queue2, true, false, false, null);//3 声明交换机channel.exchangeDeclare(direct_exchange, BuiltinExchangeType.DIRECT,true);//4 交换机和队列绑定channel.queueBind(direct_queue2, direct_exchange, man);// 5 声明消费者Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(tag: consumerTag ,message: new String(body));// 消息消费成功以后的唯一标识System.out.println(envelope.getDeliveryTag());// 确认签收当前的一条消息如果是true是签收队列里面所有的消息channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(direct_queue2, consumer);}
}
5 通配符模式
你的心思我要
1 概念
交换机是 topic 因为路由模式里是精确匹配比较局限使用通配符方式通配符提⾼了匹配的范围扩展业务。 Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 item.insert 通配符规则# 匹配一个或多个词* 匹配不多不少恰好1个词例如item.# 能够匹配 item.insert.abc 或者 item.insertitem.* 只能匹配 item.insert。
2 代码 生产者 package com.xinzhi.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Product {public static void main(String[] args) throws IOException, TimeoutException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明交换机和类型,并且持久化channel.exchangeDeclare(topic_exchange, BuiltinExchangeType.TOPIC,true);//3 将信息发给交换机并且指定路由键String message1 laohanxueit;channel.basicPublish(topic_exchange,xinzhi.15,null,message1.getBytes());System.out.println(success);channel.close();connection.close();}
}消费者 package com.xinzhi.topic;import com.rabbitmq.client.*;
import com.xinzhi.utils.RabbitUtil;import java.io.IOException;public class MyConsumer {public static void main(String[] args) throws IOException {//1 获取连接和信道Connection connection RabbitUtil.getConnection();Channel channel connection.createChannel();//2 声明队列channel.queueDeclare(topic_queue, true, false, false, null);//3 声明交换机channel.exchangeDeclare(topic_exchange, BuiltinExchangeType.TOPIC,true);//4 交换机和队列绑定channel.queueBind(topic_queue, topic_exchange, xinzhi.#);// 5 声明消费者Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(tag: consumerTag ,message: new String(body));// 消息消费成功以后的唯一标识System.out.println(envelope.getDeliveryTag());// 确认签收当前的一条消息如果是true是签收队列里面所有的消息channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(topic_queue, consumer);}
}
六 SpringBoot整合
1 发布订阅
1 新建boot项目
2 导入依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
3 配置文件
server:port: 8099
spring:rabbitmq:host: 192.168.56.140port: 5672username: laohan123password: laohan123virtual-host: /laohan
4 配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {public static final String EXCHANGE_NAME fanout_exchage;public static final String QUEUE_NAME fanout_queue;Bean(queue)public Queue queue(){
// return new Queue(QUEUE_NAME, true, false, false);return QueueBuilder.durable(QUEUE_NAME).build();}Bean(exchange)public Exchange exchange(){
// return new FanoutExchange(EXCHANGE_NAME, true, false);return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();}Beanpublic Binding binding(Qualifier(queue) Queue queue, Qualifier(exchange) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with().noargs();}}
5监听类
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;Component
public class RabbitListen {RabbitListener(queues {RabbitConfig.QUEUE_NAME})
public void listener(String body,Message message, Channel channel) throws IOException {long msgTag message.getMessageProperties().getDeliveryTag();System.out.println(msgTagmsgTag);System.out.println(messagemessage);System.out.println(bodybody);}
}
6 测试类发送消息
Autowired
private RabbitTemplate rabbitTemplate;Test
void contextLoads() {rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,,老韩学it);
}
2 topic 在发布订阅的基础上修改交换机名称和路由绑定就可以了 package com.xinzhi.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {
// public static final String EXCHANGE_NAME fanout_exchage;public static final String EXCHANGE_NAME topic_exchange;
// public static final String QUEUE_NAME fanout_queue;public static final String QUEUE_NAME topic_queue;Bean(queue)public Queue queue(){
// return new Queue(QUEUE_NAME, true, false, false);return QueueBuilder.durable(QUEUE_NAME).build();}Bean(exchange)public Exchange exchange(){
// return new FanoutExchange(EXCHANGE_NAME, true, false);
// return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}Beanpublic Binding binding(Qualifier(queue) Queue queue, Qualifier(exchange) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(xinzhi.#).noargs();}}发送消息验证 Autowiredprivate RabbitTemplate rabbitTemplate;Testvoid contextLoads() {rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,xinzhi.15,老韩学it);}
七 消息的可靠性投递
1 什么是消息的可靠性投递 保证消息一定能发到消息队列中 细节 保证mq节点成功接受消息 消息发送端需要接受到mq服务端接收到消息的确认应答 完善的消息补偿机制发送失败的消息可以再感知并二次处理 RabbitMQ消息投递路径 生产者--交换机--队列--消费者 通过两个点的控制保证消息的可靠性投递 生产者到交换机 confirmCallback 交换机到队列 returnCallbakc 建议 开启消息确认机制以后保证了消息的准确送达但由于频繁的确认交互RabbitMQ的整体效率变低吞吐量下降严重不是非常重要的消息不建议用消息确认机制
2 confirmCallback 机制 生产者投递消息以后如果Broker收到消息以后会给生产者一个ACK生产者通过ACK可以确认这条消息是否成功发送到Broker。 开启confirmCallback spring.rabbitmq.publisher-confirm-type: correlated 发送代码
Test
void confirm(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 消息到交换机的确认* param correlationData 配置信息* param ack 交换机确认 true消息接受成功 false消息接受失败* param cause 消息发送失败原因*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(ConfirmCallback);System.out.println(correlationDatacorrelationData);System.out.println(ackack);System.out.println(causecause);if(ack){System.out.println(发送成功);// 更新数据库 成功}else {System.out.println(发送失败,日志或数据库纪录);// 更新数据库 失败}}});rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,xinzhi.15,老韩学it);
} 模拟失败场景,修改发送时候交换机名称 2 returnCallback return机制保证消息在rabbitmq中能够成功的投递到队列⾥ 两种模式 交换机到队列不成功则丢弃消息默认 交换机到队列不成功返回生产者触发returnCallback 开启returnCallback交换机到队列的可靠性投递 spring.rabbitmq.publisher-returnstrue 修改投递到队列失败的策略 spring.rabbitmq.template.mandatorytrue 发送消息验证. Test
void returnCallback(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {Overridepublic void returnedMessage(ReturnedMessage returned) {int code returned.getReplyCode();System.out.println(codecode);System.out.println(returnedreturned);}});rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,xinzhi.15,老韩学it);
} 发送消息以后没有任何提示我们修改路由键 八 消息确认
1 背景
保证消息从队列到消费者的过程。
2 ACK介绍 消费者从RabbitMQ中获取消息并且处理完成以后反馈给RabbitMQRabbitMQ收到确认消息以后才能把消息从队列中删除 消费者在处理消息的时候出现了网络不稳定、服务器异常等情况那么就不会有ACK反馈RabbitMQ认为这个消息没有正常消费就将这个消息放回队列里面 只有当消费者正确发送ack以后RabbitMQ才会把消息从队列中删除 消息的ack确认机制默认是打开的消息如果未被进行ack的消息确认机制这条消息将被锁定
3 确认方式 自动 手动manual spring.rabbitmq.listener.simple.acknowledge-modemanual 发送消息并且开启监听模式虽然消息被消费了但是因为开启了手动确认模式配置但是代码里没有手动确认所以队列里的消息不会删除 代码中开启确认机制 channel.basicAck(msgTag,false); 消息拒绝
// false 一次拒绝一条 true 重新回到队列
channel.basicNack(msgTag,false,true);
结果就会看到控制台一直接受消息因为对列有消息就会被监听到监听以后拒绝了又放到队列里面然后 又监听... DeliveryTag 表示消息投递的序号每次消费消息或者消息重新投递以后DeliveryTag都会1 basicReject 也是消息拒绝的一次只能拒绝一条消息也可以设置是否重新回如队列