当前位置: 首页 > news >正文

网站建设收费标准如何南阳网站

网站建设收费标准如何,南阳网站,网站色调选择,做境外盈利网站违法吗文章目录 前言一、MQ是什么#xff1f; 1.1 AMQP 二、在Linux安装RabbitMQ 2.1 安装2.2 RabbitMQ启动命令2.3 开启RabbitMQ 后台管理界面 2.3.1 登录rabbitMQ UI界面 2.3 Docker启动RabbitMQ2.4 常见消息模型2.5 生产者(Producer) / 消费者(Consumer)2.6 工作队列模式(Work Q…文章目录 前言一、MQ是什么 1.1 AMQP 二、在Linux安装RabbitMQ 2.1 安装2.2 RabbitMQ启动命令2.3 开启RabbitMQ 后台管理界面 2.3.1 登录rabbitMQ UI界面 2.3 Docker启动RabbitMQ2.4 常见消息模型2.5 生产者(Producer) / 消费者(Consumer)2.6 工作队列模式(Work Queues)2.7 参数细节2.8 实现能者多劳 2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送2.8.2 预取值 2.9 Publish/Subscribe 发布/订阅2.10 Routing路由) - Direct2.11 Routing路由)- Topic 三、进阶篇 高级特性 3.1 死信队列 3.1.1 死信队列实战消息TTL过期3.1.2 死信队列实战队列达到最大长度 设置正常队列最大长度3.1.3 死信队列实战消息被拒 3.2 基于SpringBoot实现延迟队列3.3 发布确认 高级特性 3.3.1 可靠性投递confirm模式3.3.2 可靠性投递return模式 3.4 优先级队列3.5 消费端限流 前言 提示RaabitMQ消息队列的学习。 一、MQ是什么 MQ全称 Message Queue消息队列是在消息的传输过程中保存消息的容器。多用于分布式系统 之间进行通信。RabbitMQ 是一个消息中间件它接受并转发消息。你可以把它当做一个快递站点当你要发送一个包 裹时你把你的包裹放到快递站快递员最终会把你的快递送到收件人那里按照这种逻辑 RabbitMQ 是 一个快递站一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于它不处理快件而是接收 存储和转发消息数据。 工作原理 1.1 AMQP AMQP即 Advanced Message Queuing Protocol高级消息队列协议是一个网络协议是应用 层协议的一个开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息遵 循此协议不收客户端和中间件产品和开发语言限制。2006年AMQP 规范发布。类比HTTP。 二、在Linux安装RabbitMQ 2.1 安装 1. 我们把erlang环境与rabbitMQ 安装包解压到Linux2. rpm -ivh erlang安装包3. yum install socat -y 安装依赖 / rpm -ivh socat依赖包 --force --nodeps4. rpm -ivh rabbitmq安装包2.2 RabbitMQ启动命令 1. 开启服务 /sbin/service rabbitmq-server start / service rabbitmq-server start 2. 停止服务 service rabbitmq-server stop 3. 重启服务 service rabbitmq-server restart 2.3 开启RabbitMQ 后台管理界面 1. rabbitmq-plugins enable rabbitmq_management添加一个新的用户 1. 创建rabbitMQ账号rabbitmqctl add_user 用户名 密码2. 设置用户角色rabbitmqctl set_user_tags 用户名 administrator #设置用户名为超级管理员3. 设置用户权限rabbitmqctl set_permissions -p / admin .* .* .*4. 查看rabbitmq的用户和角色rabbitmqctl list_users5. 登录rabbitMQ 界面 Linux虚拟机ip:15672 即可2.3.1 登录rabbitMQ UI界面 记得开放15672端口访问 Linux虚拟机ip:15672 即可输入账户密码后看到这个界面代表成功 2.3 Docker启动RabbitMQ Docker安装 1. docker pull rabbitmq:3-management2. 开启rabbitMQdocker run \-e RABBITMQ_DEFAULT_USERroot \-e RABBITMQ_DEFAULT_PASS123456 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management2.4 常见消息模型 channel操作MQ的工具exchange路由消息到队列中queue缓存消息virtual host虚拟主机是对queue、exchange等资源的逻辑分组 2.5 生产者(Producer) / 消费者(Consumer) 所需依赖 dependencies!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.7.3/version/dependency!-- https://mvnrepository.com/artifact/commons-io/commons-io --dependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion2.4/version/dependency/dependencies 1234567891011121314生产者代码 /*** 生产者发消息*/ public class Producer {//队列名称public static final String QUEUE_NAMEhello;//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost(ip地址);//设置用户名密码factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);//创建连接Connection connection factory.newConnection();//通过连接来获取 信道来发消息Channel channel connection.createChannel();/*** 生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化 默认false 信息存储在内存中* 3.该列队是否只供一个消费者进行消费是否进行消息共享* true:可以多个消费者消费* false只能一个消费者消费* 4.是否自动删除最后一个消费者断开连接后该队列是否自动删除* true自动删除* false不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发消息String messagehello rabbitMQ;/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的KEY值是哪个 指的是本次队列的名称* 3.其他参数信息* 4.发送的消息体*/channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(消息发送完毕);channel.close();connection.close();} }消费者 /*** 消费者接收消息*/ public class Consumer {//队列名称接收此队列的消息public static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost(ip地址);//设置用户名密码factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);//创建连接Connection connection factory.newConnection();//通过连接来获取 信道来收消息Channel channel connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback(consumerTag, message)- {//message包含消息头和消息体我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data new String(message.getBody());System.out.println(new String(message.getBody()));};//声明 取消消费的回调CancelCallback cancelCallbackconsumerTag-{System.out.println(消费消息被中断);};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应* true代表自动应答* false手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }2.6 工作队列模式(Work Queues) 模式说明 Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消费,采用的是 轮询机制应用场景对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度工作模式生产者 public class ProducerWorkQueue {//队列名称public static final String QUEUE_NAMEhello;//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost(ip地址);//设置用户名密码factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);//创建连接Connection connection factory.newConnection();//通过连接来获取 信道来发消息Channel channel connection.createChannel();/*** 生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化 默认false 信息存储在内存中* 3.该列队是否只供一个消费者进行消费是否进行消息共享* true:可以多个消费者消费* false只能一个消费者消费* 4.是否自动删除最后一个消费者断开连接后该队列是否自动删除* true自动删除* false不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);for (int i 1; i 10; i) {//发消息String messageihello rabbitMQ;/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的KEY值是哪个 指的是本次队列的名称* 3.其他参数信息* 4.发送的消息体*/channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(消息发送完毕);}channel.close();connection.close();} }工作模式两个消费者 /*** 消费者接收消息*/ public class ConsumerWorkQueues1 {//队列名称接收此队列的消息public static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost(ip地址);//设置用户名密码factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);//创建连接Connection connection factory.newConnection();//通过连接来获取 信道来收消息Channel channel connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback(consumerTag, message)- {//message包含消息头和消息体我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data new String(message.getBody());System.out.println(new String(message.getBody()));};//声明 取消消费的回调CancelCallback cancelCallbackconsumerTag-{System.out.println(消费消息被中断);};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应* true代表自动应答* false手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }/*** 消费者接收消息*/ public class ConsumerWorkQueues2 {//队列名称接收此队列的消息public static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost(ip地址);//设置用户名密码factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);//创建连接Connection connection factory.newConnection();//通过连接来获取 信道来收消息Channel channel connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback(consumerTag, message)- {//message包含消息头和消息体我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data new String(message.getBody());System.out.println(new String(message.getBody()));};//声明 取消消费的回调CancelCallback cancelCallbackconsumerTag-{System.out.println(消费消息被中断);};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应* true代表自动应答* false手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }结果各执行五次,也验证了 我们上面所说的 轮询机制 小结 一个消息只能有一个接收者但是可以有多个接收者 2.7 参数细节 durable是否进行持久化,当前队列如果进行持久化,我们重启rabbitMQ后当前队列依旧存在 //消费者生成的队列channel.queueDeclare(QUEUE_NAME,(durable)true/false,false,false,null);props :队列中的信息是否持久化,若消息持久化,我们重启rabbitMQ后当前队列依旧存在 //MessageProperties.PERSISTENT_TEXT_PLAIN:将消息进行持久化channel.basicPublish(,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); autoDelete是否自动删除,最后一个消费者断开连接后该队列是否自动删除 channel.queueDeclare(QUEUE_NAME,false,false,(autoDelete的参数位置)false,null);autoAck自动应答 若开启了自动应答,rabbitMQ消息队列分配给消费者10个数据,只要消费者拿到消息队列的数据时,就会告诉消息队列,数据处理完毕。若当我们处理到第5个数据时,消费者出现了宕机,死掉了,则会出现数据丢失channel.basicConsume(QUEUE_NAME,(autoAck是否自动应答)false,deliverCallback,cancelCallback);2.8 实现能者多劳 业务场景 当我们的两个消费者执行业务时,a消费者执行速度快,b消费者执行速度慢,我们想让执行速度快的多执行,应当如何实现呢 开启不公平分发,能者多劳 channel.basicQos(1); 0轮询机制 1能者多劳开启手动确认 消费者a /*** 消费者接收消息*/ public class ConsumerWorkQueues1 {//队列名称接收此队列的消息public static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//开启不公平分发,能者多劳channel.basicQos(1);DeliverCallback deliverCallback(consumerTag, message)- {String data new String(message.getBody());System.out.println(new String(message.getBody()));//参数1确认队列中那个具体的消息// 可以获取消息的id // 消息routingkey// 交换机 exchange// 消息和重传标志//参数2是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallbackconsumerTag-{System.out.println(消费消息被中断);};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应* true代表自动应答* false手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} }消费者b消费消息时然消费者b休眠100毫秒 public class ConsumerWorkQueues2 {//队列名称接收此队列的消息public static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//开启不公平分发,能者多劳channel.basicQos(1);//声明 接收消息的回调DeliverCallback deliverCallback(consumerTag, message)- {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new String(message.getBody()));//手动确认消息//参数1确认队列中那个具体的消息 参数2是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//声明 取消消费的回调CancelCallback cancelCallbackconsumerTag-{System.out.println(消费消息被中断);};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应* true代表自动应答* false手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}执行结果 消费者a执行 消费者b执行 2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送 应用场景两个消费者每次都从队列中来获取消息,若消费者a正常执行,消费者b在执行过程中出现了宕机,挂掉了那么我们未被消费的消息会被重新放回到队列中防止消息丢失。 生产者 public class ProducerWorkQueue {//队列名称public static final String QUEUE_NAMEhello;//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner new Scanner(System.in);while (true){String msg scanner.nextLine();channel.basicPublish(,QUEUE_NAME, null,msg.getBytes());System.out.println(消息发送完毕);}} }消费者a public class ConsumerWorkQueues1 {//队列名称接收此队列的消息public static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);//创建连接Connection connection factory.newConnection();//通过连接来获取 信道来收消息Channel channel connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback(consumerTag, message)- {//message包含消息头和消息体我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data new String(message.getBody());System.out.println(消费者1new String(message.getBody()));try {int i3/0;//模拟业务发生异常channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch (Exception e){System.out.println(拒收消息发生了异常);//拒收消息//参数一表示投递的消息标签//参数二是否开启多个消息同时确认//参数三是否重新给队列发送channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);}};//声明 取消消费的回调CancelCallback cancelCallbackconsumerTag-{System.out.println(消费消息被中断);};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应* true代表自动应答* false手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} }消费者b public class ConsumerWorkQueues2 {//队列名称接收此队列的消息public static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);//创建连接Connection connection factory.newConnection();//通过连接来获取 信道来收消息Channel channel connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback(consumerTag, message)- {//message包含消息头和消息体我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址System.out.println(睡10秒);try {Thread.sleep(1000*10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new String(message.getBody()));//手动确认消息//参数1确认队列中那个具体的消息 参数2是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//声明 取消消费的回调CancelCallback cancelCallbackconsumerTag-{System.out.println(消费消息被中断);};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应* true代表自动应答* false手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} }当消费者b在消费消息时我们让消费者b睡眠10秒模拟业务流程,在这10秒内我们手动关掉消费者b 发送 aa 消费者a接收 发送bb消费者b接收,在消费者b睡眠过程中我们停止消费者b,来看看手动应答的结果 此时我们查看消费者a,出现了本应该是消费者b消费的消息bb 2.8.2 预取值 channel.basicQos(1); 0轮询机制 1能者多劳 若值1代表当前队列的预取值,代表当前队列大概会拿到多少值2.9 Publish/Subscribe 发布/订阅 也可以叫 广播模式,当我们的P消费者发送了消息交给了X(交换机)所有绑定了这个X交换机的队列都可以接收到P消费者发送的消息代码实现生产者 public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//将通道声明指定交换机, 参数一交换机名称 参数二交换机类型 fanout广播类型 //参数2交换机类型也可使用 BuiltinExchangeType. 的方式来查看选择channel.exchangeDeclare(order, fanout);channel.basicPublish(order,,null,fanout type message.getBytes());channel.close();connection.close();} }代码实现消费者 public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//通道绑定交换机channel.exchangeDeclare(order,fanout);//获取临时队列名称String queueName channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queueName,order,);channel.basicConsume(queueName,true,(consumerTag,message)-{System.out.println(消费者1new String(message.getBody()));},consumerTag - System.out.println(取消消费消息));} }2.10 Routing路由) - Direct routing值订阅模型-Direct(直连) 在上面广播模式中,一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange 在Direct模型下 队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey(路由key)消息的发送方在Exchange发送消息时,也必须指定消息的RoutingKeyExchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的RoutingKey与消息的Routing Key完全一致才会接受到消息 生产者 public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//通过信道声明交换机 参数一交换机名称 参数二direct 路由模式channel.exchangeDeclare(logsExchange,direct);//发送消息 参数一发送信息到的交换机名称// 参数二绑定路由 发送给队列的那个路由key,//只有当队列的路由key与交换机的路由key相对应时队列才会接受到消息channel.basicPublish(logsExchange,msgRouting,null,routing logs direct info 发送了消息.getBytes());channel.close();connection.close();} }消费者 public class Consumer1 {public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(logs,direct);//获取临时队列名String queueName channel.queueDeclare().getQueue();//绑定队列参数一临时队列名称 参数二绑定的交换机名称 参数三路由key若消费者的路由key与生产者的路由key相同则可以收到消息channel.queueBind(queueName,logsExchange,infoRouting);channel.queueBind(queueName,logsExchange,msgRouting);channel.basicConsume(queueName,true,(consumerTag, message) - System.out.println(new String(message.getBody())),consumerTag - System.out.println(1));} }消费者2 public class Consumer2 {public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(logs,direct);//获取临时队列名String queueName channel.queueDeclare().getQueue();channel.queueBind(queueName,logs,error);channel.queueBind(queueName,logs,msg);channel.basicConsume(queueName,true,(consumerTag, message) - System.out.println(new String(message.getBody())),consumerTag - System.out.println(1));} }2.11 Routing路由)- Topic Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符 #通配符* (star) can substitute for exactly one word 匹配一个词# (hash) can substitute for zero or more words 匹配一个或多个词生产者 public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//通过信道声明交换机 参数一交换机名称 参数二topic 动态路由channel.exchangeDeclare(order,topic);String routingKeyuser.order;//发送消息 参数一发送信息到的交换机名称 参数二绑定路由 发送给队列的那个路由keychannel.basicPublish(order,routingKey,null,(routing logs topic发送了消息routingKey).getBytes());channel.close();connection.close();} }消费者1 public class Consumer1 {public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(order,topic);//获取临时队列名String queueName channel.queueDeclare().getQueue();//绑定队列参数一临时队列名称 参数二绑定的交换机名称 参数三动态通配符路由keychannel.queueBind(queueName,order,user.*);channel.basicConsume(queueName,true,(consumerTag, message) - System.out.println(new String(message.getBody())),consumerTag - System.out.println(1));} }消费者2 public class Consumer2 {public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(order,topic);//获取临时队列名String queueName channel.queueDeclare().getQueue();//绑定队列参数一临时队列名称 参数二绑定的交换机名称 参数三动态通配符路由keychannel.queueBind(queueName,order,user.#);channel.basicConsume(queueName,true,(consumerTag, message) - System.out.println(new String(message.getBody())),consumerTag - System.out.println(1));} }三、进阶篇 高级特性 3.1 死信队列 死信,顾名思义就是无法被消费的信息字面意思可以这样理解一般来说producer将消息投递到queue里consumer从queue取出消息进行消费但某些时候由于特定的原因导致queue中的某些消息无法被消费这样的消息如果没有后续的处理就变成了死信自然就有了死信队列应用场景 为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制当消息消费发生异常时将消息投入死信队列中。比如说用户在商城下单成功并点击去支付后在指定时间未支付时自动失效生产者给正产的消息队列发送消息并且设置消息过期时间为10S,超过10S消息未被消费,则消息进入死信队列 public class TTLProvider {//普通交换机名称public static final String NORMAL_EXCHANGEnormal_exchange;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip);factory.setUsername(账户);factory.setPassword(密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//发送死信 设置TTL过期时间AMQP.BasicProperties propertiesnew AMQP.BasicProperties().builder().expiration(10000).build();for (int i 1; i 10; i) {String msgi;channel.basicPublish(NORMAL_EXCHANGE,normal,properties,msg.getBytes());}System.out.println(结束发送);} }正常队列消费者 public class TTLConsumer1 {//普通交换机名称public static final String NORMAL_EXCHANGEnormal_exchange;//死信交换机名称public static final String DEAD_EXCHANGEdead_exchange;//普通队列名称public static final String NORMAL_QUEUEnormal_queue;//死信队列名称public static final String DEAD_QUEUEdead_queue;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip);factory.setUsername(账户);factory.setPassword(密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,direct);channel.exchangeDeclare(DEAD_EXCHANGE,direct);//声明普通队列HashMapString, Object map new HashMap();//当消息被拒绝接受/未被消费 会将消息转发到死信队列//正常队列设置死信交换机map.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信队列的routingKeymap.put(x-dead-letter-routing-key,dead);channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,normal);//绑定死信交换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,dead);DeliverCallback deliverCallback( consumerTag, message)-{System.out.println(Consumer1接收消息new String(message.getBody(),UTF-8));};CancelCallback cancelCallback(consumerTag)- System.out.println(consumerTag);channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);} }死信队列消费者 public class TTLConsumer2 {//死信队列名称public static final String DEAD_QUEUEdead_queue;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip);factory.setUsername(账户);factory.setPassword(密码);Connection connection factory.newConnection();Channel channel connection.createChannel();DeliverCallback deliverCallback( consumerTag, message)-{System.out.println(Consumer1接收消息new String(message.getBody(),UTF-8));};CancelCallback cancelCallback(consumerTag)- System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);} }结果当设置了死信队列,和TTL过期时间,若超过了过期时间消息未被消费,则消息会转发到死信队列 死信队列产生三大原因消息被拒接消息TTL过期队列达到最大长度 3.1.1 死信队列实战消息TTL过期 配置类 Configuration public class RabbitMQConfiguration {//普通交换机public static final String X_EXCHANGEX;//死信交换机public static final String Y_DEAD_LETTER_EXCHANGEY;//普通队列public static final String QUEUE_AQA;public static final String QUEUE_BQB;//死信队列public static final String DEAD_QUEUE_DQD;//声明普通x交换机Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明死信交换机Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明普通队列A TTL:10SBeanpublic Queue queueA(){MapString,Object argnew HashMap();//设置死信交换机arg.put(x-dead-letter-exchange,Y_DEAD_LETTER_EXCHANGE);//设置死信routingKeyarg.put(x-dead-letter-routing-key,YD);//设置TTL过期时间arg.put(x-message-ttl,10000);return QueueBuilder.durable(QUEUE_A).withArguments(arg).build();}//声明普通队列B TTL:40SBeanpublic Queue queueB(){MapString,Object argnew HashMap();//设置死信交换机arg.put(x-dead-letter-exchange,Y_DEAD_LETTER_EXCHANGE);//设置死信routingKeyarg.put(x-dead-letter-routing-key,YD);//设置TTL过期时间arg.put(x-message-ttl,40000);return QueueBuilder.durable(QUEUE_B).withArguments(arg).build();}//死信队列Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_QUEUE_D).build();}Beanpublic Binding queueABindingX(Qualifier(queueA) Queue queueA,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with(XA);}Beanpublic Binding queueBBindingX(Qualifier(queueB) Queue queueB,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with(XB);}Beanpublic Binding queueDBindingY(Qualifier(queueD) Queue queueD,Qualifier(yExchange) DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with(YD);} }TTL生产者 RestController RequestMapping(/ttl) Slf4j public class TTLProvider {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/{msg})public void sendMsg(PathVariable(msg) String msg){log.info(当前发送时间{}发送了一条消息,new Date().toString());rabbitTemplate.convertAndSend(X,XA,TTL消息延迟为10S,消息为msg);rabbitTemplate.convertAndSend(X,XB,TTL消息延迟为40S,消息为msg);} }死信队列消费者 Component Slf4j public class DeadLetterConsumer {RabbitListener(queues QD)public void t1(Message message, Channel channel)throws Exception{log.info(收到死信队列的消息{},时间为{},new String(message.getBody(),UTF-8),new Date().toString());} }死信队列-TTL过期时间测试结果 3.1.2 死信队列实战队列达到最大长度 设置正常队列最大长度 生产者 public class Producer {//普通交换机名称public static final String NORMAL_EXCHANGEnormal_exchange;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();for (int i 1; i 10; i) {String msgi;channel.basicPublish(NORMAL_EXCHANGE,normal,null,msg.getBytes());}} }消费者a //设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列 map.put(“x-max-length”,6); public class Consumer01 {//普通交换机名称public static final String NORMAL_EXCHANGEnormal_exchange;//死信交换机名称public static final String DEAD_EXCHANGEdead_exchange;//普通队列名称public static final String NORMAL_QUEUEnormal_queue;//死信队列名称public static final String DEAD_QUEUEdead_queue;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,direct);channel.exchangeDeclare(DEAD_EXCHANGE,direct);//声明普通队列HashMapString, Object map new HashMap();//正常队列设置死信交换机map.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信队列的routingKeymap.put(x-dead-letter-routing-key,dead);//设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列map.put(x-max-length,6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,normal);//绑定死信交换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,dead);DeliverCallback deliverCallback( consumerTag, message)-{System.out.println(Consumer1接收消息new String(message.getBody(),UTF-8));};CancelCallback cancelCallback(consumerTag)- System.out.println(consumerTag);channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);} }消费者b public class Consumer02 {//死信队列名称public static final String DEAD_QUEUEdead_queue;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();DeliverCallback deliverCallback( consumerTag, message)-{System.out.println(Consumer1接收消息new String(message.getBody(),UTF-8));};CancelCallback cancelCallback(consumerTag)- System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);} }3.1.3 死信队列实战消息被拒 生产者 public class Producer {//普通交换机名称public static final String NORMAL_EXCHANGEnormal_exchange;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();for (int i 1; i 10; i) {String msginfoi;channel.basicPublish(NORMAL_EXCHANGE,normal,null,msg.getBytes());}} }消费者a 此消息被拒接,是否重新放回正常队列, false不放回 则会放到死信队列1.channel.basicReject(message.getEnvelope().getDeliveryTag(),false);2.并且开启手动应答 public class Consumer01 {//普通交换机名称public static final String NORMAL_EXCHANGEnormal_exchange;//死信交换机名称public static final String DEAD_EXCHANGEdead_exchange;//普通队列名称public static final String NORMAL_QUEUEnormal_queue;//死信队列名称public static final String DEAD_QUEUEdead_queue;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip);factory.setUsername(登录账户);factory.setPassword(登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,direct);channel.exchangeDeclare(DEAD_EXCHANGE,direct);//声明普通队列HashMapString, Object map new HashMap();//正常队列设置死信交换机map.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信队列的routingKeymap.put(x-dead-letter-routing-key,dead);channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,normal);//绑定死信交换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,dead);DeliverCallback deliverCallback( consumerTag, message)-{String msgnew String(message.getBody());if(info5.equals(msg)){System.out.println(Consumer1接收消息msg此消息被拒绝);//此消息被拒接,是否重新放回正常队列, false不放回 则会放到死信队列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println(Consumer1接收消息new String(message.getBody(),UTF-8));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};CancelCallback cancelCallback(consumerTag)- System.out.println(consumerTag);//开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);} }消费者b public class Consumer02 {//死信队列名称public static final String DEAD_QUEUEdead_queue;public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();DeliverCallback deliverCallback( consumerTag, message)-{System.out.println(Consumer1接收消息new String(message.getBody(),UTF-8));};CancelCallback cancelCallback(consumerTag)- System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);} }3.2 基于SpringBoot实现延迟队列 配置队列交换机 Configuration public class QueueConfig {Bean(exchange)public DirectExchange exchange(){return new DirectExchange(msg);}Bean(simpleQue)public Queue simpleQue(){HashMapString, Object map new HashMap();//设置死信交换机map.put(x-dead-letter-exchange,dead);//设置死信路由map.put(x-dead-letter-routing-key,deadKey);//消息失效时间map.put(x-message-ttl,10000);return new Queue(simple,false,false,false,map);}Beanpublic Binding simpleQueueBandingExchange(Qualifier(simpleQue) Queue simple,Qualifier(exchange) DirectExchange msg)throws Exception{return BindingBuilder.bind(simple).to(msg).with(info);}Bean(deadExchange)public DirectExchange exchange1(){return new DirectExchange(dead);}Bean(deadQueue)public Queue deadQ(){return new Queue(deadQue,false,false,false,null);}Beanpublic Binding deadKeyBindingDeadExchange(Qualifier(deadQueue)Queue queue,Qualifier(deadExchange)DirectExchange dead){//绑定死信队列到死信交换机通过路由return BindingBuilder.bind(queue).to(dead).with(deadKey);} }生产者 RestController public class Provider {Autowiredprivate RabbitTemplate rabbitTemplate;RequestMapping(/ttl/{message})public void t1(PathVariable String message){String queueNamesimple;Date date new Date();System.out.println(date);rabbitTemplate.convertAndSend(msg,info,message);} }消费者 Component public class Consumer {RabbitListener(queues deadQue)public void hello(Message msg, Channel channel)throws Exception{System.out.println(接收到消息new String(msg.getBody()));Date date1 new Date();System.out.println(date1);} }我们看到消息每隔十秒更新一次 3.3 发布确认 高级特性 3.3.1 可靠性投递confirm模式 场景在生产环境中由于一些不明原因,导致rabbitmq重启,在rabbitmq重启期间的生产者消息投递失败,导致消息丢失,需要手动处理和恢复。-可靠性投递confirm模式需要在application核心配置文件中设置发布确认类型spring-rabbitmq-publisher-confirm-type: correlated类型1none禁用发布确认模式,是默认值类型2correlated发布消息成功到交换机后出发回调方法类型3simple和correlated效果一样,但是如果回调返回的是false会关闭信道,接下来无法发送消息 配置类 Component public class confirmConfig {public static final String CONFIRM_EXCHANGE_NAMEconfirm.exchange;public static final String CONFIRM_QUEUEconfirm.queue;public static final String CONFIRM_ROUTING_KEYconfirm;Bean(confirmExchange)public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}Bean(confirmQueue)public Queue confirmQueue(){return new Queue(CONFIRM_QUEUE);}Beanpublic Binding confirmQueueBindingConfirmExchange(Qualifier(confirmExchange)DirectExchange confirmExchange,Qualifier(confirmQueue)Queue confirmQueue){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);} }当生产者发送给交换机消息时,交换机的名字错了,或者交换机挂掉了,会导致消息的丢失,那么我们需要实现回调接口,当交换机收到消息后会给生产者发送回调消息 实现回调接口实现 RabbitTemplate.ConfirmCallback接口的confirm方法并且将其注入到rabbit模板的内部类中 Component Slf4j public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {Autowiredprivate RabbitTemplate rabbitTemplate;//注入PostConstruct //当所有注解执行完后,再执行这个注解public void init(){rabbitTemplate.setConfirmCallback(this);}/*** 交换机确认回调方法* 发消息交换机接收到了回调* 参数* 1. correlationData保存消息的ID及相关信息,这个消息是我们生产者手动传入的* 2. 交换机收到消息 true* 3. null*//*** 交换机确认回调方法* 发消息交换机接收失败回调* 参数* 1. correlationData保存消息的ID及相关信息* 2. 交换机收到消息 false* 3. cause失败的原因*/Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String idcorrelationData!null?correlationData.getId():;if(b){log.info(交换机已经收到了ID为{}的消息,id);}else {log.info(交换机为收到了ID为{}的消息原因是{},id,s);}} }生产者 RestController public class ConfirmProducer {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/sendMsg/{msg})public void t1(PathVariable String msg){CorrelationData correlationData new CorrelationData();correlationData.setId(1);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,嘿嘿嘿.getBytes(),correlationData);} }消费者 Component public class ConfirmConsumer {RabbitListener(queues ConfirmConfig.CONFIRM_QUEUE)public void consumer(Message message){System.out.println(高级特性确认发布消费者收到了消息new String(message.getBody()));} }测试当我们正常发送消息 测试当我们把交换机名字换掉 3.3.2 可靠性投递return模式 场景若交换机收到消息,队列没有收到消息,应该如何解决需要在application核心配置文件中设置是否回退消息当消息路由不到消费者spring-rabbitmq-publisher-returnstrue 开启回退消息 Component Slf4j public class ConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{Autowiredprivate RabbitTemplate rabbitTemplate;//注入PostConstruct //当所有注解执行完后,再执行这个注解public void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* 发消息交换机接收到了回调* 参数* 1. correlationData保存消息的ID及相关信息* 2. 交换机收到消息 true* 3. null*//*** 交换机确认回调方法* 发消息交换机接收失败回调* 参数* 1. correlationData保存消息的ID及相关信息* 2. 交换机收到消息 false* 3. cause失败的原因*/Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String idcorrelationData!null?correlationData.getId():;if(b){log.info(交换机已经收到了ID为{}的消息,id);}else {log.info(交换机未收到了ID为{}的消息原因是{},id,s);}}/*** 消息传递过程中 不可达 消费者的队列时将消息返回给生产者* 只有当消息 不可达 目的地的时候 才进行回调* 参数1消息体* 参数2回复代码* 参数3回复原因* 参数4交换机* 参数5路由key*/Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.info(消息{},被交换机{}退回,原因是{},路由是{},new String(message.getBody()),s1,s,s2);}}3.4 优先级队列 优先级越高,消息先被消费者消费官方设置最大优先级 0-255 超出优先级则报错 自己使用时数字不必设置很大,会浪费CPU效率 生产者 public class PriorityProducer {public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();//设置优先级参数AMQP.BasicProperties build new AMQP.BasicProperties.Builder().priority(10).build();for (int i 1; i 10; i) {String msginfoi;if(i5){channel.basicPublish(,hi,build,msg.getBytes());}else {channel.basicPublish(,hi,null,msg.getBytes());}}} }消费者 public class PriorityConsumer {public static void main(String[] args) throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(ip地址);factory.setUsername(RabbitMQ登录用户名);factory.setPassword(RabbitMQ登录密码);Connection connection factory.newConnection();Channel channel connection.createChannel();HashMapString, Object map new HashMap();//设置当前队列为优先级队列map.put(x-max-priority,10);channel.queueDeclare(hi,false,false,false,map);channel.basicConsume(hi,true,(consumerTag,message)-{System.out.println(优先级队列接收消息顺序new String(message.getBody()));},(consumerTag) - System.out.println(取消回调));} }测试结果我们定义的是消息5优先级最高,其他消息为默认优先级 3.5 消费端限流 参数一prefetchSize预先载入的大小 0表示不限制大小参数二prefetchCount预先载入的消息条数参数三globalfalse注意autoAck手动应答一定要为false //设置每次确定一个消息channel.basicQos(0,1,false); 12生产者 public class AckProvider {//队列名称public static final String QUEUE_NAMEhello_Ack;//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(ip);factory.setUsername(用户);factory.setPassword(密码);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner new Scanner(System.in);while (true){String msg scanner.nextLine();channel.basicPublish(,QUEUE_NAME, null,msg.getBytes());System.out.println(消息发送完毕);}} }消费者 public class AckConsumer2 {//队列名称接收此队列的消息public static final String QUEUE_NAMEhello_Ack;public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(ip);factory.setUsername(用户);factory.setPassword(密码);//创建连接Connection connection factory.newConnection();//通过连接来获取 信道来收消息Channel channel connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback(consumerTag, message)- {//message包含消息头和消息体我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址System.out.println(new String(message.getBody()));try {Thread.sleep(1000*5);} catch (InterruptedException e) {e.printStackTrace();}//手动确认消息//参数1确认队列中那个具体的消息 参数2是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//声明 取消消费的回调CancelCallback cancelCallbackconsumerTag-{System.out.println(消费消息被中断);};//每次只消费一个channel.basicQos(0,1,false);/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应* true代表自动应答* false手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} }
http://www.zqtcl.cn/news/577999/

相关文章:

  • 怎么样开始做网站网站建设 营业执照 经营范围
  • 威海做网站网站建设方案书 模版
  • 泗阳做网站南昌建设
  • 做企业网站用什么软件深圳制作企业网站
  • 大连微信网站开发兰州网站建设模板
  • 建设项目安监备案网站外贸 网站 seo
  • 企慕网站建设网络推广合肥市网站制作
  • 做空比特币网站大气简约企业网站模板免费下载
  • 坪山网站建设行业现状做网站能月入10万
  • 个人网站有什么内容广西网站建设推广
  • 安徽教育云网站建设网站seo诊断的主要内容
  • 网站建设例子开发工具宏怎么使用
  • 新乡做网站公司哪个地区网站建设好
  • 网站模板怎么编辑网站定制化
  • 利于优化的网站网络科技公司怎么赚钱
  • 制作网站的步骤和方法做物流的网站有哪些功能
  • vs做网站图片明明在文件夹里却找不到中国建筑网官网找客户信息
  • WordPress仿站培训黑龙江新闻夜航
  • 如何利用开源代码做网站济南做网站互联网公司有哪些
  • 生意网app下载官网郑州做网站优化公
  • wordpress网站更换域名wordpress 小工具定制
  • 上海做机床的公司网站设计网站怎样做色卡
  • 一个网站怎么绑定很多个域名做网站后台应该谁来做
  • 跑纸活做网站加大门户网站安全制度建设
  • 多商户开源商城seo对网店的作用有哪些
  • 提供微信网站建设福州seo建站
  • 泉州市住房与城乡建设网站潍坊网站建设方案外包
  • 网络文化经营许可证怎么申请免费seo提交工具
  • 网站建设 需求分析报告手机网站微信网站开发
  • 做司法考试题目的网站建站中企动力