当前位置: 首页 > news >正文

网站建设实训结论与分析总结做网站有哪个软件好

网站建设实训结论与分析总结,做网站有哪个软件好,百度标注平台怎么加入,网站风格设计的选择一.概述与安装 //RabbitMQ //1.核心部分-高级部分-集群部分 //2.什么是MQ 消息队列message queue 先入先出原则;消息通信服务 //3.MQ的大三功能 流量消峰 应用解耦 消息中间件 //#xff08;1#xff09;人-订单系统(1万次/S)— 人 - MQ(流量消峰,对访问人员进行排队) -…一.概述与安装 //RabbitMQ //1.核心部分-高级部分-集群部分 //2.什么是MQ 消息队列message queue 先入先出原则;消息通信服务 //3.MQ的大三功能 流量消峰 应用解耦 消息中间件 //1人-订单系统(1万次/S)— 人 - MQ(流量消峰,对访问人员进行排队) -订单系统保护系统不宕机 //2订单系统-支付/库存/物流系统— 订单系统-MQ-支付/库存/物流系统 //3A -API -B  — A - MQ -B  这样可以通过MQ完成时告知A //4.MQ的分类 ActiveMQ(老) Kafka(大数据的杀手锏) RocketMQ RabbitMQ(中小型公司推荐) //5.RabbitMQ概念 //1就是一个快递站  发件人-快递员-快递站MQ-快递员-收件人 //2生产者-MQ1交换机、N队列-1个队列对应1个消费者 //6.核心部分 6大模式 //1简单模式Hello World //2工作模式Work queues //3发布订阅模式Publish/Subscribe //4路由模式Routing //5主体模式Topics //6发布确认模式Publisher Confirms //7Broker消息实体可以有多个交换机Exchange(可以有多个队列Queue);Connection多个Channel // Producer生产者 Consumer消费者 Binding绑定,交换机和queue之间的虚拟连接 //7.RMQ的安装 //集中下载 链接https://pan.baidu.com/s/1NJfYnLT4DN-uu-uyIXzA4w  提取码HIT0 //1先安装erlang  yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel //2下载 wget http://erlang.org/download/otp_src_22.0.tar.gz //3解压 tar -zxvf otp_src_22.0.tar.gz //4移动 mv otp_src_22.0 /usr/local/ //5切换目录 cd /usr/local/otp_src_22.0/ //6创建即将安装的目录 mkdir ../erlang //7配置安装路径 ./configure --prefix/usr/local/erlang //8安装 make install //9查看一下是否安装成功 ll /usr/local/erlang/bin //10添加环境变量 echo export PATH$PATH:/usr/local/erlang/bin  /etc/profile //11刷新环境变量 source /etc/profile //12甩一条命令 erl    halt(). 退出 //---------------------安装RMQ------------------------- //13下载 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz //14由于是tar.xz格式的所以需要用到xz没有的话就先安装  yum install -y xz //15第一次解压 /bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz //16第二次解压 tar -xvf rabbitmq-server-generic-unix-3.7.15.tar //17移动 mv rabbitmq_server-3.7.15/ /usr/local/ //18改名 mv /usr/local/rabbitmq_server-3.7.15  /usr/local/rabbitmq //19配置环境变量 echo export PATH$PATH:/usr/local/rabbitmq/sbin  /etc/profile //20刷新环境变量 source /etc/profile //21创建配置目录 mkdir /etc/rabbitmq //22启动rabbitmq-server -detached //23停止rabbitmqctl stop //24状态rabbitmqctl status //25开放端口 firewall-cmd --zonepublic --add-port15672/tcp --permanent     firewall-cmd --zonepublic --add-port5672/tcp --permanent //26开启web插件 rabbitmq-plugins enable rabbitmq_management //27访问 http://wdfgdzx.top:15672/   默认账号密码guest guest这个账号只允许本机访问 //28查看所有用户 rabbitmqctl list_users //29添加一个用户 rabbitmqctl add_user xlliu24 s19911009! //30配置权限 rabbitmqctl set_permissions -p / xlliu24 .* .* .* //31查看用户权限 rabbitmqctl list_user_permissions xlliu24 //32设置tag   rabbitmqctl set_user_tags xlliu24 administrator //33删除用户安全起见删除默认用户 rabbitmqctl delete_user guest //34然后用新用户登录,成功后看到界面 二.如何使用RMQ //1.Hello World //1引入maven包  amqp-client commons-io //2P(生产者 -发消息-队列hello(中间件)- 接受消息-C消费者 //生产者 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //发消息 public class Producer {//队列名称public static final String QUEUE_NAMEhello;//发消息public static void main(String[] args) throws Exception {//创建工厂ConnectionFactory connectionFactorynew ConnectionFactory();//工厂IP 连接RMQ的队列connectionFactory.setHost(47.105.174.97);//用户名密码connectionFactory.setUsername(xlliu24);connectionFactory.setPassword(s19911009!);//创建连接Connection connection connectionFactory.newConnection();//获取信道Channel channel connection.createChannel();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME,true,false,false,null);//发消息String messageHello World;/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(消息发送完毕...);} } //消费者 package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer {//队列的名称public static final String QUEUE_NAMEHello;//接收消息public static void main(String[] args) throws Exception {//创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();connectionFactory.setHost(47.105.174.97);connectionFactory.setUsername(xlliu24);connectionFactory.setPassword(s19911009!);//创建新链接Connection connectionconnectionFactory.newConnection();Channel channelconnection.createChannel();//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(new String(message.getBody()));};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */channel.basicConsume(hello,true,deliverCallback,cancelCallback);} } ------------------------------------ //2.Work Queues 工作队列 //1生产者-大量发消息-队列hello-接受消息-N个消费者(工作线程) //2注意一个消息只能被处理一次,不能被处理多次。所以工作线程采用的是轮训分发消息 // 抽取信道工具类 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RMQUtil{public static Channel getChannel() throws Exception{//创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();connectionFactory.setHost(47.105.174.97);connectionFactory.setUsername(xlliu24);connectionFactory.setPassword(s19911009!);//创建新链接Connection connectionconnectionFactory.newConnection();return connection.createChannel();} } ----------------------- package com.day.controller; import com.rabbitmq.client.Channel; //生产者,发送大量的消息 public class Producer {//队列名称public static final String QUEUE_NAMEhello;//发消息public static void main(String[] args) throws Exception {//获取信道Channel channel RMQUtil.getChannel();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME,true,false,false,null);//发消息String messageHello World;/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */for(int i0;i1000;i){channel.basicPublish(,QUEUE_NAME,null,(message i).getBytes());System.out.println(消息发送完毕...);}} } ------------------- package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer {//队列的名称public static final String QUEUE_NAMEhello;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C0接收到的消息: new String(message.getBody()));};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println(C0等待接受消息...);channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} } ---------------- package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 {//队列的名称public static final String QUEUE_NAMEhello;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C1接收到的消息: new String(message.getBody()));};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println(C1等待接受消息...);channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} } ---------------------- package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer2 {//队列的名称public static final String QUEUE_NAMEhello;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C2接收到的消息: new String(message.getBody()));};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println(C2等待接受消息...);channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} } -----------------控制台打印信息如下---------------------- C0接收到的消息: Hello World 0 C0接收到的消息: Hello World 3 C0接收到的消息: Hello World 6 C0接收到的消息: Hello World 9 ---------- C1接收到的消息: Hello World 1 C1接收到的消息: Hello World 4 C1接收到的消息: Hello World 7 C1接收到的消息: Hello World 10 ----------- C2接收到的消息: Hello World 2 C2接收到的消息: Hello World 5 C2接收到的消息: Hello World 8 C2接收到的消息: Hello World 11//3.消息应答 //1即消费者处理完毕后-应答-生产者删除消息 //2自动应答对环境要求高,并不可取; //3手动应答:basicAck(肯定) basicNack basicReject //4批量应答multiple 当前8 true 5 6 7 8都确认, false只会应答8,建议使用false //5消息自动重新入队,当某个通道发生异常时,RMQ将了解到消息未完全处理,并将对其重新排队。让其他通道处理,保证消息的不丢失与处理。 //6目的手动应答保证消息的不丢失。 // 抽取信道工具类 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RMQUtil{public static Channel getChannel() throws Exception{//创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();connectionFactory.setHost(47.105.174.97);connectionFactory.setUsername(xlliu24);connectionFactory.setPassword(s19911009!);//创建新链接Connection connectionconnectionFactory.newConnection();return connection.createChannel();} } -------- package com.day.controller; import com.rabbitmq.client.Channel; //生产者,发送大量的消息 public class Producer {//队列名称public static final String QUEUE_NAMEhello;//发消息public static void main(String[] args) throws Exception {//获取信道Channel channel RMQUtil.getChannel();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME,true,false,false,null);//发消息String messageHello World;/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */for(int i0;i10;i){channel.basicPublish(,QUEUE_NAME,null,(message i).getBytes(UTF-8));System.out.println((message i) 消息发送完毕...);}} } -------- package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer {//队列的名称public static final String QUEUE_NAMEhello;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{try {Thread.sleep(30000);} catch (Exception e) {e.printStackTrace();}System.out.println(C0接收到的消息: new String(message.getBody(),UTF-8));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println(C0等待接受消息...);channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} } ------------- package com.day.controller;import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;//消费者 public class Consumer1 {//队列的名称public static final String QUEUE_NAMEhello;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}System.out.println(C1接收到的消息: new String(message.getBody(),UTF-8));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println(C1等待接受消息...);channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} } ------------如果C0在等待过程中宕机或者发生异常全部消息有C1处理------------------ C0等待接受消息... C0接收到的消息: Hello World 0 -------------------- C1等待接受消息... C1接收到的消息: Hello World 1 C1接收到的消息: Hello World 3 C1接收到的消息: Hello World 5 C1接收到的消息: Hello World 7 C1接收到的消息: Hello World 9 C1接收到的消息: Hello World 2 C1接收到的消息: Hello World 4 C1接收到的消息: Hello World 6 C1接收到的消息: Hello World 8//4.队列RMQ持久化 //1channel.queueDeclare(QUEUE_NAME, true,false,false,null); //2持久化后重启RMQ后仍然存在//5.消息持久化 //1channel.basicPublish(,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message i).getBytes(UTF-8)); //2但是不是绝对的,如果想绝对要参考后面的发布确认章节//6.不公平分发 //1轮训分发— //2问题在两个消费者就会出现,C1处理快,C0处理慢,而分发任务一致,就会出现能者不多劳。 //3由消费者设置分发策略: channel.basicQos(1); package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; //生产者,发送大量的消息 public class Producer {//队列名称public static final String QUEUE_NAMEhello;//发消息public static void main(String[] args) throws Exception {//获取信道Channel channel RMQUtil.getChannel();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME, true,false,false,null);//发消息String messageHello World;/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */for(int i0;i10;i){channel.basicPublish(,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message i).getBytes(UTF-8));System.out.println((message i) 消息发送完毕...);}} } package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer {//队列的名称public static final String QUEUE_NAMEhello;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//设置分发策略channel.basicQos(1);//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{try {Thread.sleep(30000);} catch (Exception e) {e.printStackTrace();}System.out.println(C0接收到的消息: new String(message.getBody(),UTF-8));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println(C0等待接受消息...);channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} } package com.day.controller;import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 {//队列的名称public static final String QUEUE_NAMEhello;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//设置分发策略channel.basicQos(1);//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}System.out.println(C1接收到的消息: new String(message.getBody(),UTF-8));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println(C1等待接受消息...);channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} } -------------------控制台信息--------------------------------- C0等待接受消息... C0接收到的消息: Hello World 0 C1接收到的消息: Hello World 1 C1接收到的消息: Hello World 2 C1接收到的消息: Hello World 3 C1接收到的消息: Hello World 4 C1接收到的消息: Hello World 5 C1接收到的消息: Hello World 6 C1接收到的消息: Hello World 7 C1接收到的消息: Hello World 8 C1接收到的消息: Hello World 9//7.预取值 //1就是预先指定C0分到多少,C1分到多少 //2也是消费者设置 channel.basicQos(3); channel.basicQos(7); package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer {//队列的名称public static final String QUEUE_NAMEhello;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//设置分发策略/预取值channel.basicQos(3);//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{try {Thread.sleep(30000);} catch (Exception e) {e.printStackTrace();}System.out.println(C0接收到的消息: new String(message.getBody(),UTF-8));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println(C0等待接受消息...);channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} } package com.day.controller;import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 {//队列的名称public static final String QUEUE_NAMEhello;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//设置分发策略/预取值channel.basicQos(7);//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}System.out.println(C1接收到的消息: new String(message.getBody(),UTF-8));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println(C1等待接受消息...);channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} } ---控制台信息--- C1等待接受消息... C1接收到的消息: Hello World 1 C1接收到的消息: Hello World 3 C1接收到的消息: Hello World 5 C1接收到的消息: Hello World 6 C1接收到的消息: Hello World 7 C1接收到的消息: Hello World 8 C1接收到的消息: Hello World 9 --- C0接收到的消息: Hello World 0 C0接收到的消息: Hello World 2 C0接收到的消息: Hello World 4//8.发布确认 //1是解决消息不丢失的重要环节 //2生产者-发消息-队列hello- //31设置队列持久化-2设置消息持久化—3发布确认(这里第3条才能确认消息真的保存在磁盘上了) //4开启发布确认的方法 channel.confirmSelect(); //5单个确认发布 批量确认发布 异步确认发布三种方法 //6单个确认发布: 发一条确认一条,速度慢 //7批量确认发布: 34集 //8异步确认发布: 第三种最牛批,性能最厉害 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import com.rabbitmq.client.MessageProperties; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; //生产者,发送大量的消息 public class Producer {//队列名称public static final String QUEUE_NAMEhello;//发消息public static void main(String[] args) throws Exception {//单个确认 耗时31830ms//publicMessageIndividually();//批量确认 耗时1660ms//publicMessageBatch();//异步确认 耗时1102mspublicMessageAsync();}//单个确认public static void publicMessageIndividually() throws Exception{long startTimeSystem.currentTimeMillis();//获取信道Channel channel RMQUtil.getChannel();//开启发布确认channel.confirmSelect();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME, true,false,false,null);//发消息String messageHello World;/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */for(int i0;i1000;i){channel.basicPublish(,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message i).getBytes(UTF-8));System.out.println((message i) 消息发送完毕...);//进行发布确认boolean flagchannel.waitForConfirms();if(flag){System.out.println(确认发送成功...);}else{System.err.println(不确认发送成功...);}}long endTimeSystem.currentTimeMillis();System.out.println(耗时(endTime-startTime)ms);}//批量发布确认public static void publicMessageBatch() throws Exception{long startTimeSystem.currentTimeMillis();//获取信道Channel channel RMQUtil.getChannel();//开启发布确认channel.confirmSelect();//创建队列channel.queueDeclare(QUEUE_NAME, true,false,false,null);//发消息//批量确认消息大小int batchSize100;String messageHello World;for(int i0;i1000;i){channel.basicPublish(,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message i).getBytes(UTF-8));System.out.println((message i) 消息发送完毕...);//判断达到100条,批量确认一次if(i%batchSize0){boolean flagchannel.waitForConfirms();if(flag){System.out.println(确认100条发送成功...);}else{System.err.println(不确认100条发送成功...);}}}long endTimeSystem.currentTimeMillis();System.out.println(耗时(endTime-startTime)ms);}//异步确认发布//生产消息的时候一一编号,也叫给ID或者提取特征,有无问题broker会通知你//速度最快,效率最高,实用性最大public static void publicMessageAsync() throws Exception{long startTimeSystem.currentTimeMillis();//线程安全有序的一个哈希表 适用于高并发的情况下ConcurrentSkipListMapLong,Object concurrentSkipListMapnew ConcurrentSkipListMap();//获取信道Channel channel RMQUtil.getChannel();//开启发布确认channel.confirmSelect();//消息监听器,监听消息是否发送成功了//确认成功的回调函数ConfirmCallback ackCallBack(deliveryTag,multiple) -{System.out.println(确认发送成功的消息 deliveryTag);//2.删除已经确认的消息//是否批量确认if(multiple){ConcurrentNavigableMapLong,Object concurrentNavigableMapconcurrentSkipListMap.headMap(deliveryTag);System.out.println(即将删除 concurrentNavigableMap);concurrentNavigableMap.clear();}else{System.out.println(即将删除 concurrentSkipListMap.get(deliveryTag));concurrentSkipListMap.remove(deliveryTag);}};//确认失败的回调函数ConfirmCallback nackCallBack(deliveryTag,multiple) -{System.err.println(不能确认发送成功的消息 deliveryTag);//打印一下未确认的消息String temp (String) concurrentSkipListMap.get(deliveryTag);System.out.println(不能确认发送成功的消息 temp);};channel.addConfirmListener(ackCallBack,nackCallBack);//创建队列channel.queueDeclare(QUEUE_NAME, true,false,false,null);String message异步确认... Hello World;for(int i0;i1000;i){//1.此处记录所有要发送的消息concurrentSkipListMap.put(channel.getNextPublishSeqNo()-2,(message i));channel.basicPublish(,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message i).getBytes(UTF-8));//System.out.println(channel.getNextPublishSeqNo());}Thread.sleep(5000);System.err.println(concurrentSkipListMap.size());long endTimeSystem.currentTimeMillis();System.out.println(耗时(endTime-startTime)ms);} } 三.发布订阅 //1.交换机 //1生产者-发消息-队列-消费者 //2原来的消息只能被消费一次,如果做到1个消息被多个消费者消费呢? //3生产者-交换机-RoutingKey-队列(仍是消息只能被消费一次)-消费者 //          -交换机-RoutingKey-队列(仍是消息只能被消费一次)-消费者... //4RMQ把消息给队列,必须走交换机,不指定是默认的交换机。 //5交换机的类型:直接direct 主题topic 标题headers 扇出fanout 无名 //6绑定bindings 用RoutingKey可以区分队列//2.Fanout 广播 发布订阅模式 根源是发把消息发给交换机然后用RoutingKey关联到多个队列给多个队列同时发消息 //1P-X-n个Q-n个消费者 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.Scanner; //生产者,发送大量的消息 public class Producer {//交换机名public static final String EXCHANGE_NAMElogs;//发消息public static void main(String[] args) throws Exception {Channel channel RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,fanout);Scanner scannernew Scanner(System.in);while(scanner.hasNext()){String messagescanner.next();channel.basicPublish(EXCHANGE_NAME,, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(UTF-8));System.out.println(生产者发出消息: message);}} } -------------- package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer {//队列的名称public static final String QUEUE_NAMEpublic0;//交换机名public static final String EXCHANGE_NAMElogs;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,);System.out.println(C0等待,把接受到的消息打印在屏幕上...);//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C0接收到的消息: new String(message.getBody(),UTF-8));};CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} } -------------------------------- package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 {//队列的名称public static final String QUEUE_NAMEpublic1;public static final String EXCHANGE_NAMElogs;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,);System.out.println(C1等待,把接受到的消息打印在屏幕上...);//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C1接收到的消息: new String(message.getBody(),UTF-8));};CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }//3.Direct Exchange直接交换机 //1路由模式,想给谁传给谁传,根据RoutingKey不同指定轻松实现。绑定相同的RoutingKey就是fanout模式了 package com.day.controller; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.Scanner; //生产者,发送大量的消息 public class Producer {//交换机名public static final String EXCHANGE_NAMEdirect_logs;public static final String CONSOLE_INFOinfo;public static final String CONSOLE_WARNINGwarning;public static final String DISK_ERRORerror;//发消息public static void main(String[] args) throws Exception {Channel channel RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);Scanner scannernew Scanner(System.in);System.out.println(生产者准备从控制台获取信息发送...);while(scanner.hasNext()){String messagescanner.next();channel.basicPublish(EXCHANGE_NAME,DISK_ERROR, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(UTF-8));System.out.println(生产者发出消息: message);}} } ---------- package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer {//队列的名称public static final String QUEUE_NAMEconsole;//交换机名public static final String EXCHANGE_NAMEdirect_logs;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,info);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,warning);System.out.println(C0等待,把接受到的消息打印在屏幕上...);//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C0接收到的消息: new String(message.getBody(),UTF-8));};CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} } ---------- package com.day.controller; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 {//队列的名称public static final String QUEUE_NAMEdisk;public static final String EXCHANGE_NAMEdirect_logs;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,error);System.out.println(C1等待,把接受到的消息打印在屏幕上...);//声明 接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C1接收到的消息: new String(message.getBody(),UTF-8));};CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }//4.Topics 主题交换机 //1P-X-RoutingKey- //2主题交换机的RoutingKey不能随意写,必须是.隔开的单词列表;*代表一个单词,#代表0或多个单词 //3支持匹配模式,*.orange.* lazy.# *.*.rabbit //4如果绑定# 则这个对垒将接受所有数据,有点像fanout //5如果没有# *就是direct交换机,这不就是多个正则吗 package com.day.controller; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.HashMap; import java.util.Map; import java.util.Scanner; //生产者,发送大量的消息 public class Producer {//交换机名public static final String EXCHANGE_NAMEtopic_logs;//发消息public static void main(String[] args) throws Exception {Channel channel RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);MapString,String bindingKeyMapnew HashMap();bindingKeyMap.put(quick.orange.rabbit,被队列Q1Q2接收到);bindingKeyMap.put(lazy.orange.elephant,被队列Q1Q2接收到);bindingKeyMap.put(quick.orange.fox,被队列Q1接收到);bindingKeyMap.put(lazy.brown.fox,被队列Q2接收到);bindingKeyMap.put(lazy.pink.rabbit,Q2接收一次);bindingKeyMap.put(quick.brown.fox,被丢弃);bindingKeyMap.put(quick.orange.male.rabbit,被丢弃);bindingKeyMap.put(lazy.orange.male.rabbit,Q2);for(String key:bindingKeyMap.keySet()){String messagebindingKeyMap.get(key);//队列名称是通过key指定的哦channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes(UTF-8));System.out.println(生产者发出消息: message);}} } ------ package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer {//队列的名称public static final String QUEUE_NAMEQ1;//交换机名public static final String EXCHANGE_NAMEtopic_logs;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,*.orange.*);System.out.println(C0等待,把接受到的消息打印在屏幕上...);//接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C0接收到的消息: new String(message.getBody(),UTF-8)。通过队列 QUEUE_NAME。绑定键: message.getEnvelope().getRoutingKey());};CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} } ------ package com.day.controller; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 {//队列的名称public static final String QUEUE_NAMEQ2;public static final String EXCHANGE_NAMEtopic_logs;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,*.*.rabbit);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,lazy.#);System.out.println(C1等待,把接受到的消息打印在屏幕上...);//接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C1接收到的消息: new String(message.getBody(),UTF-8)。通过队列 QUEUE_NAME。绑定键: message.getEnvelope().getRoutingKey());};CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }//5.死信 //1消息拒绝 效果过期 队列达到最大长度 //2开启消费者0,然后关闭,然后运行生产者发消息,会发现消息在normal_queue,10S后在dead_queue的变化 //3无法被消费的消息,由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信 //4死信队列机制,消费时发生异常,将消息放到死信队列中,防止消息的丢失 //5死信的来源:消息TTL过期、队列达到最大长度、消息被拒绝并且requeuefalse; package com.day.controller; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.util.HashMap; import java.util.Map; //生产者,发送大量的消息 public class Producer {//交换机名public static final String NORMAL_EXCHANGEnormal_exchange;//发消息public static void main(String[] args) throws Exception {Channel channel RMQUtil.getChannel();//单位是ms 即设置10SAMQP.BasicProperties basicPropertiesnew AMQP.BasicProperties().builder().expiration(10000).build();for(int i0;i10;i){String message生产者发送的消息: i;//TTL -TIME TO LIVEchannel.basicPublish(NORMAL_EXCHANGE,normalBindLine,basicProperties,message.getBytes(UTF-8));System.out.println(生产者发出消息: message);}} } ------ package com.day.controller; import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; //消费者 public class Consumer {//普通交换机名public static final String NORMAL_EXCHANGEnormal_exchange;//死信交换机public static final String DEAD_EXCHANGEdead_exchange;//两个队列public static final String NORMAL_QUEUEnormal_queue;public static final String DEAD_QUEUEdead_queue;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//-----------------------------------//声明交换机channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//-----------------------------------//声明队列-要使用特殊的参数才能转发到死信队列MapString,Object argumentsMapnew HashMap();//设置转发的交换机//argumentsMap.put(x-message-ttl,10000);//也可以在发消息时指定argumentsMap.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置RoutingKeyargumentsMap.put(x-dead-letter-routing-key,deadBindLine);channel.queueDeclare(NORMAL_QUEUE,true,false,false,argumentsMap);//死信队列channel.queueDeclare(DEAD_QUEUE,true,false,false,null);//-----------------------------------//绑定交换机与队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,normalBindLine);channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,deadBindLine);//-----------------------------------System.out.println(C0等待,把接受到的消息打印在屏幕上...);//接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C0接收到的消息: new String(message.getBody(),UTF-8)。通过队列 NORMAL_QUEUE。绑定键: message.getEnvelope().getRoutingKey());};CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);} } ------ package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 {public static final String DEAD_QUEUEdead_queue;public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();System.out.println(C1等待,把接受到的消息打印在屏幕上...);//接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C1接收到的消息: new String(message.getBody(),UTF-8)。通过队列 DEAD_QUEUE。绑定键: message.getEnvelope().getRoutingKey());};CancelCallback cancelCallbackconsumerTag -{System.out.println(consumerTag 消费消息被中断...);};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);} }//5.死信 //1队列达到最大长度 package com.day.controller; import com.rabbitmq.client.Channel; //生产者,发送大量的消息 public class Producer {//交换机名public static final String NORMAL_EXCHANGEnormal_exchange;//发消息public static void main(String[] args) throws Exception {Channel channel RMQUtil.getChannel();//单位是ms 即设置10S/* AMQP.BasicProperties basicPropertiesnew AMQP.BasicProperties().builder().expiration(10000).build();*/for(int i0;i10;i){String message生产者发送的消息: i;channel.basicPublish(NORMAL_EXCHANGE,normalBindLine,null,message.getBytes(UTF-8));System.out.println(生产者发出消息: message);}} } ------ package com.day.controller; import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; //消费者 public class Consumer {//普通交换机名public static final String NORMAL_EXCHANGEnormal_exchange;//死信交换机public static final String DEAD_EXCHANGEdead_exchange;//两个队列public static final String NORMAL_QUEUEnormal_queue;public static final String DEAD_QUEUEdead_queue;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//-----------------------------------//声明交换机channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//-----------------------------------//声明队列-要使用特殊的参数才能转发到死信队列MapString,Object argumentsMapnew HashMap();//设置转发的交换机//argumentsMap.put(x-message-ttl,10000);//也可以在发消息时指定argumentsMap.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置RoutingKeyargumentsMap.put(x-dead-letter-routing-key,deadBindLine);argumentsMap.put(x-max-length,6);channel.queueDeclare(NORMAL_QUEUE,true,false,false,argumentsMap);//死信队列channel.queueDeclare(DEAD_QUEUE,true,false,false,null);//-----------------------------------//绑定交换机与队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,normalBindLine);channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,deadBindLine);//-----------------------------------System.out.println(C0等待,把接受到的消息打印在屏幕上...);//接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C0接收到的消息: new String(message.getBody(),UTF-8)。通过队列 NORMAL_QUEUE。绑定键: message.getEnvelope().getRoutingKey());};CancelCallback cancelCallbackconsumerTage -{System.out.println(consumerTage 消费消息被中断...);};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);} } ------ package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 {public static final String DEAD_QUEUEdead_queue;public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();System.out.println(C1等待,把接受到的消息打印在屏幕上...);//接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C1接收到的消息: new String(message.getBody(),UTF-8)。通过队列 DEAD_QUEUE。绑定键: message.getEnvelope().getRoutingKey());};CancelCallback cancelCallbackconsumerTag -{System.out.println(consumerTag 消费消息被中断...);};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);} }//5.死信 //1消息被拒绝 package com.day.controller; import com.rabbitmq.client.Channel; //生产者,发送大量的消息 public class Producer {//交换机名public static final String NORMAL_EXCHANGEnormal_exchange;//发消息public static void main(String[] args) throws Exception {Channel channel RMQUtil.getChannel();//单位是ms 即设置10S/* AMQP.BasicProperties basicPropertiesnew AMQP.BasicProperties().builder().expiration(10000).build();*/for(int i0;i10;i){String message生产者发送的消息: i;channel.basicPublish(NORMAL_EXCHANGE,normalBindLine,null,message.getBytes(UTF-8));System.out.println(生产者发出消息: message);}} } ------ package com.day.controller; import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; //消费者 public class Consumer {//普通交换机名public static final String NORMAL_EXCHANGEnormal_exchange;//死信交换机public static final String DEAD_EXCHANGEdead_exchange;//两个队列public static final String NORMAL_QUEUEnormal_queue;public static final String DEAD_QUEUEdead_queue;//接收消息public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();//-----------------------------------//声明交换机channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//-----------------------------------//声明队列-要使用特殊的参数才能转发到死信队列MapString,Object argumentsMapnew HashMap();//设置转发的交换机//argumentsMap.put(x-message-ttl,10000);//也可以在发消息时指定argumentsMap.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置RoutingKeyargumentsMap.put(x-dead-letter-routing-key,deadBindLine);channel.queueDeclare(NORMAL_QUEUE,true,false,false,argumentsMap);//死信队列channel.queueDeclare(DEAD_QUEUE,true,false,false,null);//-----------------------------------//绑定交换机与队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,normalBindLine);channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,deadBindLine);//-----------------------------------System.out.println(C0等待,把接受到的消息打印在屏幕上...);//接受消息DeliverCallback deliverCallback(consumerTag,message) -{String msgnew String(message.getBody(),UTF-8);if(msg.contains(5)){//拒绝且不放回原队列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else{System.out.println(C0接收到的消息: new String(message.getBody(),UTF-8)。通过队列 NORMAL_QUEUE。绑定键: message.getEnvelope().getRoutingKey());channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};CancelCallback cancelCallbackconsumerTag -{System.out.println(consumerTag 消费消息被中断...);};//必须开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);} } ------ package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 {public static final String DEAD_QUEUEdead_queue;public static void main(String[] args) throws Exception {//信道Channel channelRMQUtil.getChannel();System.out.println(C1等待,把接受到的消息打印在屏幕上...);//接受消息DeliverCallback deliverCallback(consumerTag,message) -{System.out.println(C1接收到的消息: new String(message.getBody(),UTF-8)。通过队列 DEAD_QUEUE。绑定键: message.getEnvelope().getRoutingKey());};CancelCallback cancelCallbackconsumerTag -{System.out.println(consumerTag 消费消息被中断...);};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);} } //56集 四.Spinrgboot整合RMQ //1.延迟队列 //1延迟队列本质上就是TTL过期的死信队列 //2P-X(普通交换机)-Y(延迟交换机)-3个队列,实现两种不同的延迟效果10S和40S //3花费了我2个小时因为Consumer类中的Channel导错了包应该为import com.rabbitmq.client.Channel; //4学会了lombok.extern.slf4j.Slf4j与yml文件配置log.info的使用 //5见识了Springboot注解配置的强大。只要功夫深问题能解决 ------POM.xml ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.5.RELEASE/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdSpringBoot/groupIdartifactIdspringboot-maven/artifactIdversion0.0.1-SNAPSHOT/versionnamespringboot-maven/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version1.8/java.version/propertiesdistributionManagementrepositoryidreleases/idnameNexus Release Repository/nameurlhttp://wdfgdzx.top:8081/nexus/content/repositories/releases//url/repositorysnapshotRepositoryidsnapshots/idnameNexus Snapshot Repository/nameurlhttp://wdfgdzx.top:8081/nexus/content/repositories/snapshots//url/snapshotRepository/distributionManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scopeexclusionsexclusiongroupIdorg.junit.vintage/groupIdartifactIdjunit-vintage-engine/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-configuration-processor/artifactIdoptionaltrue/optional/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactId/dependencydependencygroupIdorg.springframework/groupIdartifactIdspring-test/artifactId/dependency!--整合MyBatis--dependencygroupIdorg.mybatis.spring.boot/groupIdartifactIdmybatis-spring-boot-starter/artifactIdversion2.1.0/version/dependency!--数据库连接池--dependencygroupIdcom.alibaba/groupIdartifactIddruid/artifactIdversion1.1.12/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion6.0.6/version/dependency!--redis依赖包--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependency!--   dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-mongodb/artifactId/dependency--!-- Thymeleaf 自动配置 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-thymeleaf/artifactId/dependency!-- 允许使用非严格的 HTML 语法 --dependencygroupIdnet.sourceforge.nekohtml/groupIdartifactIdnekohtml/artifactIdversion1.9.22/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-test/artifactId/dependency!--SpringBoot热部署配置 --!--  dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-devtools/artifactIdscoperuntime/scopeoptionaltrue/optional/dependency--dependencygroupIdorg.jetbrains/groupIdartifactIdannotations/artifactIdversion13.0/versionscopecompile/scope/dependency!--json--dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.46/version/dependencydependencygroupIdcommons-codec/groupIdartifactIdcommons-codec/artifactIdversion1.9/version/dependencydependencygroupIdcom.squareup.okhttp3/groupIdartifactIdokhttp/artifactIdversion3.9.1/version/dependencydependencygroupIdcom.squareup.okio/groupIdartifactIdokio/artifactIdversion1.15.0/version/dependencydependencygroupIdorg.apache.httpcomponents/groupIdartifactIdhttpclient/artifactIdversion4.5.6/version/dependencydependencygroupIdorg.apache.httpcomponents/groupIdartifactIdhttpcore/artifactIdversion4.4.10/version/dependencydependencygroupIdorg.apache.httpcomponents/groupIdartifactIdhttpmime/artifactIdversion4.5.6/version/dependencydependencygroupIdcommons-lang/groupIdartifactIdcommons-lang/artifactIdversion2.6/version/dependency!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --dependencygroupIdcom.google.code.gson/groupIdartifactIdgson/artifactIdversion2.6.2/version/dependencydependencygroupIdnet.minidev/groupIdartifactIdjson-smart/artifactId/dependency!--Rich文本开始--dependencygroupIdcom.gitee.qdbp.thirdparty/groupIdartifactIdueditor/artifactIdversion1.4.3.3/version/dependency!-- https://mvnrepository.com/artifact/org.json/json --dependencygroupIdorg.json/groupIdartifactIdjson/artifactIdversion20160810/version/dependency!-- https://mvnrepository.com/artifact/commons-io/commons-io --dependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion2.4/version/dependency!-- https://mvnrepository.com/artifact/commons-fileupload/commons-fileupload --dependencygroupIdcommons-fileupload/groupIdartifactIdcommons-fileupload/artifactIdversion1.3.1/version/dependency!-- https://mvnrepository.com/artifact/commons-codec/commons-codec --dependencygroupIdcommons-codec/groupIdartifactIdcommons-codec/artifactIdversion1.9/version/dependency!--Rich文本结束--!-- 读取Excel --dependencygroupIdorg.apache.poi/groupIdartifactIdpoi-ooxml/artifactIdversion4.1.2/version/dependency!--顺丰本地jar包放置与引入--dependencygroupIdcom.iflytek.msp.sfexpress/groupIdartifactIdexpress-sdk/artifactIdversion2.1.5/versionscopesystem/scopesystemPath${project.basedir}/src/main/resources/libs/sf-csim-express-sdk-V2.1.5.jar/systemPath/dependency!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.9.0/version/dependency!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.20/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 --dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency/dependenciesbuildplugins!-- plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion2.1.5.RELEASE/version/plugin--plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationincludeSystemScopetrue/includeSystemScope/configurationversion2.1.5.RELEASE/version/plugin!-- plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationforktrue/forkaddResourcestrue/addResources/configuration/plugin--plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins/build /project ------yml server:port: 8001tomcat:accesslog:buffered: truedirectory: /home/A/SpringBootenabled: truefile-date-format: .yyyy-MM-ddpattern: commonprefix: access_logrename-on-rotate: falserequest-attributes-enabled: falserotate: truesuffix: .log spring:#devtools:#restart:#enabledtrue: #支持热部署  可能导致重启然后非实时语音转写报错。rabbitmq:host: wdfgdzx.topport: 5672username: xlliu24password: s19911009!redis: #配置redishost: wdfgdzx.topprot: 6379datasource:name: mydbtype: com.alibaba.druid.pool.DruidDataSourceurl: jdbc:mysql://wdfgdzx.top:3306/mydb?serverTimezoneGMT%2b8username: rootpassword: s19911009!driver-class-name: com.mysql.cj.jdbc.Driverthymeleaf:prefix: classpath:/site/check-template-location: true  #check-tempate-location: 检查模板路径是否存在enabled: trueencoding: UTF-8content-type: text/htmlcache: falsemode: HTMLsuffix: .htmlservlet:multipart: #配置文件上传max-file-size: 1000MB #设置上传的单个文件最大值单位可以是 MB、KB默认为 1MBmax-request-size: 1024MB #设置多文件上传时单次内多个文件的总量的最大值单位可以是 MB、KB默认为 10 M mybatis:mapper-locations: classpath*:/mybatis/*Mapper.xml logging:level:root: info -------交换机声明、队列声明、绑定 package com.day.controller; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; Configuration public class RMQConfig {//普通交换机private static final String COMMON_EXCHANGEX;//死信交换机private static final String DEAD_EXCHANGEY;//普通队列private static final String COMMON_QUEUE_AQA;private static final String COMMON_QUEUE_BQB;//死信队列private static final String DEAD_QUEUE_DQD;//----------------------------------//声明交换Bean(commonExchange)public DirectExchange commonExchange(){return new DirectExchange(COMMON_EXCHANGE);}Bean(deadExchange)public DirectExchange deadExchange(){return new DirectExchange(DEAD_EXCHANGE);}//----------------------------------//声明队列Bean(commonQueueA)public Queue commonQueueA(){MapString,Object argumentsnew HashMap(3);//设置死信交换机arguments.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信Routing Keyarguments.put(x-dead-letter-routing-key,YD);//设置TTLarguments.put(x-message-ttl,10000);return QueueBuilder.durable(COMMON_QUEUE_A).withArguments(arguments).build();}Bean(commonQueueB)public Queue commonQueueB(){MapString,Object argumentsnew HashMap(3);//设置死信交换机arguments.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信Routing Keyarguments.put(x-dead-letter-routing-key,YD);//设置TTLarguments.put(x-message-ttl,40000);return QueueBuilder.durable(COMMON_QUEUE_B).withArguments(arguments).build();}Bean(deadQueueD)public Queue deadQueueD(){return QueueBuilder.durable(DEAD_QUEUE_D).build();}//----------------------------------//绑定Beanpublic Binding commonQueueABindingCommonExchange(Qualifier(commonQueueA) Queue commonQueueA,Qualifier(commonExchange) DirectExchange commonExchange){return BindingBuilder.bind(commonQueueA).to(commonExchange).with(XA);}Beanpublic Binding commonQueueBBindingCommonExchange(Qualifier(commonQueueB) Queue commonQueueB,Qualifier(commonExchange) DirectExchange commonExchange){return BindingBuilder.bind(commonQueueB).to(commonExchange).with(XB);}Beanpublic Binding deadQueueDBindingDeadExchange(Qualifier(deadQueueD) Queue deadQueueD,Qualifier(deadExchange) DirectExchange deadExchange){return BindingBuilder.bind(deadQueueD).to(deadExchange).with(YD);} } ------生产者 http://localhost:8001/ttl/sendMsg/Hello package com.day.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.io.Serializable; import java.util.Date; //发送延迟消息 Slf4j RestController RequestMapping(/ttl) public class SendMsgController implements Serializable{Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息GetMapping(/sendMsg/{message})public void sendMsg(PathVariable String message){//System.out.println(当前时间: new Date().toString()发送一条信息给两个TTL队列 message);log.info(当前时间: {},发送一条信息给两个TTL队列{},new Date().toString(),message);rabbitTemplate.convertAndSend(X,XA,消息来自ttl为10s的队列: message);rabbitTemplate.convertAndSend(X,XB,消息来自ttl为40s的队列: message);} } -----消费者 package com.day.controller; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 Slf4j Component public class Consumer implements Serializable {RabbitListener(queuesQD)public void receiveD(Message message, Channel channel) throws Exception{byte[] bodymessage.getBody();String msgnew String(body, Charset.forName(UTF-8));log.info(当前时间:{},收到死信队列的消息:{},new Date().toString(),msg);} } ------SwaggerConfig 非必须 package com.day.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; Configuration EnableSwagger2 public class SwaggerConfig {Beanpublic Docket webApiConfig(){return new Docket(DocumentationType.SWAGGER_2).groupName(webApi).apiInfo(webApiInfo()).select().build();}private ApiInfo webApiInfo(){return new ApiInfoBuilder().title(RMQ接口文档).description(本文描述了RMQ的微服务接口定义).version(1.0).contact(new Contact(wdfgdzx,http://wdfgdzx.top,wdfgdzx163.com)).build();} } //2.延迟队列的优化 package com.day.controller; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; Configuration public class RMQConfig {//普通交换机private static final String COMMON_EXCHANGEX;//死信交换机private static final String DEAD_EXCHANGEY;//普通队列private static final String COMMON_QUEUE_AQA;private static final String COMMON_QUEUE_BQB;//延迟队列优化private static final String COMMON_QUEUE_CQC;//死信队列private static final String DEAD_QUEUE_DQD;//----------------------------------//声明交换Bean(commonExchange)public DirectExchange commonExchange(){return new DirectExchange(COMMON_EXCHANGE);}Bean(deadExchange)public DirectExchange deadExchange(){return new DirectExchange(DEAD_EXCHANGE);}//----------------------------------//声明队列Bean(commonQueueA)public Queue commonQueueA(){MapString,Object argumentsnew HashMap(3);//设置死信交换机arguments.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信Routing Keyarguments.put(x-dead-letter-routing-key,YD);//设置TTLarguments.put(x-message-ttl,10000);return QueueBuilder.durable(COMMON_QUEUE_A).withArguments(arguments).build();}//绑定Beanpublic Binding commonQueueABindingCommonExchange(Qualifier(commonQueueA) Queue commonQueueA,Qualifier(commonExchange) DirectExchange commonExchange){return BindingBuilder.bind(commonQueueA).to(commonExchange).with(XA);}//声明队列Bean(commonQueueB)public Queue commonQueueB(){MapString,Object argumentsnew HashMap(3);//设置死信交换机arguments.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信Routing Keyarguments.put(x-dead-letter-routing-key,YD);//设置TTLarguments.put(x-message-ttl,40000);return QueueBuilder.durable(COMMON_QUEUE_B).withArguments(arguments).build();}//绑定Beanpublic Binding commonQueueBBindingCommonExchange(Qualifier(commonQueueB) Queue commonQueueB,Qualifier(commonExchange) DirectExchange commonExchange){return BindingBuilder.bind(commonQueueB).to(commonExchange).with(XB);}//声明队列Bean(commonQueueC)public Queue commonQueueC(){MapString,Object argumentsnew HashMap(3);//设置死信交换机arguments.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信Routing Keyarguments.put(x-dead-letter-routing-key,YD);return QueueBuilder.durable(COMMON_QUEUE_C).withArguments(arguments).build();}//绑定Beanpublic Binding commonQueueCBindingCommonExchange(Qualifier(commonQueueC) Queue commonQueueC,Qualifier(commonExchange) DirectExchange commonExchange){return BindingBuilder.bind(commonQueueC).to(commonExchange).with(XC);}//声明队列Bean(deadQueueD)public Queue deadQueueD(){return QueueBuilder.durable(DEAD_QUEUE_D).build();}//绑定Beanpublic Binding deadQueueDBindingDeadExchange(Qualifier(deadQueueD) Queue deadQueueD,Qualifier(deadExchange) DirectExchange deadExchange){return BindingBuilder.bind(deadQueueD).to(deadExchange).with(YD);} } ----------------------------------- package com.day.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 Slf4j RestController RequestMapping(/ttl) public class ProducerController implements Serializable{Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息GetMapping(/sendMsg/{message})public void sendMsg(PathVariable String message){//System.out.println(当前时间: new Date().toString()发送一条信息给两个TTL队列 message);log.info(当前时间:{},发送一条信息给两个TTL队列{},new Date(),message);rabbitTemplate.convertAndSend(X,XA,消息来自ttl为10s的队列: message);rabbitTemplate.convertAndSend(X,XB,消息来自ttl为40s的队列: message);}//开始发消息 消息 TTLGetMapping(/sendExpireMsg/{message}/{ttlTime})public void sendMsg(PathVariable String message,PathVariable String ttlTime){log.info(当前时间:{},发送一条时长{}ms信息给TTL队列QC{},new Date(),ttlTime,message);rabbitTemplate.convertAndSend(X,XC,message,msg -{msg.getMessageProperties().setExpiration(ttlTime);return msg;});} } ------------------------------------------ package com.day.controller; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 Slf4j Component public class Consumer implements Serializable {RabbitListener(queuesQD)public void receiveD(Message message, Channel channel) throws Exception{byte[] bodymessage.getBody();String msgnew String(body, Charset.forName(UTF-8));log.info(当前时间:{},收到死信队列的消息:{},new Date().toString(),msg);} } //3.日志文件配置与使用 配置------------- ?xml version1.0 encodingUTF-8? !-- 日志级别从低到高分为TRACE  DEBUG  INFO  WARN  ERROR  FATAL如果设置为WARN则低于WARN的信息都不会输出 -- !-- scan:当此属性设置为true时配置文档如果发生改变将会被重新加载默认值为true -- !-- scanPeriod:设置监测配置文档是否有修改的时间间隔如果没有给出时间单位默认单位是毫秒。当scan为true时此属性生效。默认的时间间隔为1分钟。 -- !-- debug:当此属性设置为true时将打印出logback内部日志信息实时查看logback运行状态。默认值为false。 -- configuration  scantrue scanPeriod10 secondscontextNamelogback-spring/contextName!-- name的值是变量的名称value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义后可以使“${}”来使用变量。 --property namelogging.path valuesrc/main/resources/static/client /!--0. 日志格式和颜色渲染 --!-- 彩色日志依赖的渲染类 --conversionRule conversionWordclr converterClassorg.springframework.boot.logging.logback.ColorConverter /conversionRule conversionWordwex converterClassorg.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter /conversionRule conversionWordwEx converterClassorg.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter /!-- 彩色日志格式 --property nameCONSOLE_LOG_PATTERN value${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}/!--1. 输出到控制台--appender nameCONSOLE classch.qos.logback.core.ConsoleAppender!--此日志appender是为开发使用只配置最底级别控制台输出的日志级别是大于或等于此级别的日志信息--filter classch.qos.logback.classic.filter.ThresholdFilterleveldebug/level/filterencoderPattern${CONSOLE_LOG_PATTERN}/Pattern!-- 设置字符集 --charsetUTF-8/charset/encoder/appender!--2. 输出到文档--!-- 2.1 level为 DEBUG 日志时间滚动输出  --appender nameDEBUG_FILE classch.qos.logback.core.rolling.RollingFileAppender!-- 正在记录的日志文档的路径及文档名 --file${logging.path}/web_debug.log/file!--日志文档输出格式--encoderpattern%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n/patterncharsetUTF-8/charset !-- 设置字符集 --/encoder!-- 日志记录器的滚动策略按日期按大小记录 --rollingPolicy classch.qos.logback.core.rolling.TimeBasedRollingPolicy!-- 日志归档 --fileNamePattern${logging.path}/web-debug-%d{yyyy-MM-dd}.%i.log/fileNamePatterntimeBasedFileNamingAndTriggeringPolicy classch.qos.logback.core.rolling.SizeAndTimeBasedFNATPmaxFileSize100MB/maxFileSize/timeBasedFileNamingAndTriggeringPolicy!--日志文档保留天数--maxHistory15/maxHistory/rollingPolicy!-- 此日志文档只记录debug级别的 --filter classch.qos.logback.classic.filter.LevelFilterleveldebug/levelonMatchACCEPT/onMatchonMismatchDENY/onMismatch/filter/appender!-- 2.2 level为 INFO 日志时间滚动输出  --appender nameINFO_FILE classch.qos.logback.core.rolling.RollingFileAppender!-- 正在记录的日志文档的路径及文档名 --file${logging.path}/web_info.log/file!--日志文档输出格式--encoderpattern%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n/patterncharsetUTF-8/charset/encoder!-- 日志记录器的滚动策略按日期按大小记录 --rollingPolicy classch.qos.logback.core.rolling.TimeBasedRollingPolicy!-- 每天日志归档路径以及格式 --fileNamePattern${logging.path}/web-info-%d{yyyy-MM-dd}.%i.log/fileNamePatterntimeBasedFileNamingAndTriggeringPolicy classch.qos.logback.core.rolling.SizeAndTimeBasedFNATPmaxFileSize100MB/maxFileSize/timeBasedFileNamingAndTriggeringPolicy!--日志文档保留天数--maxHistory15/maxHistory/rollingPolicy!-- 此日志文档只记录info级别的 --filter classch.qos.logback.classic.filter.LevelFilterlevelinfo/levelonMatchACCEPT/onMatchonMismatchDENY/onMismatch/filter/appender!-- 2.3 level为 WARN 日志时间滚动输出  --appender nameWARN_FILE classch.qos.logback.core.rolling.RollingFileAppender!-- 正在记录的日志文档的路径及文档名 --file${logging.path}/web_warn.log/file!--日志文档输出格式--encoderpattern%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n/patterncharsetUTF-8/charset !-- 此处设置字符集 --/encoder!-- 日志记录器的滚动策略按日期按大小记录 --rollingPolicy classch.qos.logback.core.rolling.TimeBasedRollingPolicyfileNamePattern${logging.path}/web-warn-%d{yyyy-MM-dd}.%i.log/fileNamePatterntimeBasedFileNamingAndTriggeringPolicy classch.qos.logback.core.rolling.SizeAndTimeBasedFNATPmaxFileSize100MB/maxFileSize/timeBasedFileNamingAndTriggeringPolicy!--日志文档保留天数--maxHistory15/maxHistory/rollingPolicy!-- 此日志文档只记录warn级别的 --filter classch.qos.logback.classic.filter.LevelFilterlevelwarn/levelonMatchACCEPT/onMatchonMismatchDENY/onMismatch/filter/appender!-- 2.4 level为 ERROR 日志时间滚动输出  --appender nameERROR_FILE classch.qos.logback.core.rolling.RollingFileAppender!-- 正在记录的日志文档的路径及文档名 --file${logging.path}/web_error.log/file!--日志文档输出格式--encoderpattern%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n/patterncharsetUTF-8/charset !-- 此处设置字符集 --/encoder!-- 日志记录器的滚动策略按日期按大小记录 --rollingPolicy classch.qos.logback.core.rolling.TimeBasedRollingPolicyfileNamePattern${logging.path}/web-error-%d{yyyy-MM-dd}.%i.log/fileNamePatterntimeBasedFileNamingAndTriggeringPolicy classch.qos.logback.core.rolling.SizeAndTimeBasedFNATPmaxFileSize100MB/maxFileSize/timeBasedFileNamingAndTriggeringPolicy!--日志文档保留天数--maxHistory15/maxHistory/rollingPolicy!-- 此日志文档只记录ERROR级别的 --filter classch.qos.logback.classic.filter.LevelFilterlevelERROR/levelonMatchACCEPT/onMatchonMismatchDENY/onMismatch/filter/appender!--logger用来设置某一个包或者具体的某一个类的日志打印级别、以及指定appender。logger仅有一个name属性一个可选的level和一个可选的addtivity属性。name:用来指定受此logger约束的某一个包或者具体的某一个类。level:用来设置打印级别大小写无关TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF还有一个特俗值INHERITED或者同义词NULL代表强制执行上级的级别。如果未设置此属性那么当前logger将会继承上级的级别。addtivity:是否向上级logger传递打印信息。默认是true。logger nameorg.springframework.web levelinfo/logger nameorg.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor levelINFO/--!--使用mybatis的时候sql语句是debug下才会打印而这里我们只配置了info所以想要查看sql语句的话有以下两种操作第一种把root levelinfo改成root levelDEBUG这样就会打印sql不过这样日志那边会出现很多其他消息第二种就是单独给dao下目录配置debug模式代码如下这样配置sql语句会打印其他还是正常info级别【logging.level.org.mybatisdebug logging.level.daodebug】--!--root节点是必选节点用来指定最基础的日志输出级别只有一个level属性level:用来设置打印级别大小写无关TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF不能设置为INHERITED或者同义词NULL。默认是DEBUG可以包含零个或多个元素标识这个appender将会添加到这个logger。--!--过滤掉spring和mybatis的一些无用的DEBUG信息--logger nameorg.springframework levelINFO/loggerlogger nameorg.mybatis levelINFO/loggerlogger nameorg.apache.zookeeper levelINFO/logger!-- 4. 最终的策略 --!-- 4.1 开发环境:打印控制台--springProfile namedevlogger namecom.dowin.globalvillage.controller leveldebug/!-- 修改此处扫描包名 --/springProfileroot leveldebugappender-ref refCONSOLE /appender-ref refDEBUG_FILE /appender-ref refINFO_FILE /appender-ref refWARN_FILE /appender-ref refERROR_FILE //root!--4.2 生产环境:输出到文档--springProfile nameproroot levelinfoappender-ref refCONSOLE /appender-ref refDEBUG_FILE /appender-ref refINFO_FILE /appender-ref refERROR_FILE /appender-ref refWARN_FILE //root/springProfile /configuration 使用------------ package com.day.controller; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 Slf4j Component public class Consumer implements Serializable {RabbitListener(queuesQD)public void receiveD(Message message, Channel channel) throws Exception{byte[] bodymessage.getBody();String msgnew String(body, Charset.forName(UTF-8));log.info(当前时间:{},收到死信队列的消息:{},new Date().toString(),msg);} } //三.发布订阅 //1.延迟队列优化     //1每个队列只对应一个延迟如果面对变化的需求怎么解决呢     //2写个传ttlTime的控制方法但是发现发送20秒   发送2秒都是一个时间接受到为什么     //3RMQ只会检查第一个消息是否过期就算是第二个消息延迟很短第二个消息也不会优先执行怎么弥补呢     //4延迟队列基于插件的下载插件rabbitmq_delayed_message_exchange-3.8.0.ez   百度网盘找     //5放置到/usr/local/rabbitmq/plugins目录下     //6安装 rabbitmq-plugins enable   rabbitmq_delayed_message_exchange     //7rabbitmqctl stop停止    rabbitmq-server -detached启动     //8重启看到x-delayed-message就代表成功。     //9写代码     ------------配置 package com.day.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; Configuration public class DelayedQueueConfig {//交换机 队列 routingKey//队列public static final String DELAYED_QUEUE_NAMEdelayed.queue;//交换机public static final String DELAYED_EXCHANGE_NAMEdelayed.exchange;//routingKeypublic static final String DELAYED_ROUTING_KEYdelayed.routingKey;//声明交换机Beanpublic CustomExchange delayedExchange(){MapString,Object argumentsnew HashMap();arguments.put(x-delayed-type,direct);//交换机名称、类型、持久化、自动删除、其他参数return new CustomExchange(DELAYED_EXCHANGE_NAME,x-delayed-message,true,false,arguments);}//队列Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}//绑定Beanpublic Binding delayedQueueBindingDelayedExchange(Qualifier(delayedQueue) Queue delayedQueue,Qualifier(delayedExchange) CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();} } -------------生产者 package com.day.controller; import com.day.config.DelayedQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 Slf4j RestController RequestMapping(/ttl) public class ProducerController implements Serializable{Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息GetMapping(/sendMsg/{message})public void sendMsg(PathVariable String message){//System.out.println(当前时间: new Date().toString()发送一条信息给两个TTL队列 message);log.info(当前时间:{},发送一条信息给两个TTL队列{},new Date(),message);rabbitTemplate.convertAndSend(X,XA,消息来自ttl为10s的队列: message);rabbitTemplate.convertAndSend(X,XB,消息来自ttl为40s的队列: message);}//开始发消息 消息 TTLGetMapping(/sendExpireMsg/{message}/{ttlTime})public void sendMsg(PathVariable String message,PathVariable String ttlTime){log.info(当前时间:{},发送一条时长{}ms信息给TTL队列QC{},new Date(),ttlTime,message);rabbitTemplate.convertAndSend(X,XC,message,msg -{msg.getMessageProperties().setExpiration(ttlTime);return msg;});}//开始发消息,基于插件的 消息及延迟的时间GetMapping(/sendDelayMsg/{message}/{delayTime})public void sendMsg(PathVariable String message,PathVariable int delayTime){log.info(当前时间:{},发送一条时长{}ms信息给延迟队列delayed.queue{},new Date(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -{msg.getMessageProperties().setDelay(delayTime);return msg;});} } ------------消费者 package com.day.controller; import com.day.config.DelayedQueueConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 Slf4j Component public class Consumer implements Serializable {RabbitListener(queues DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayQueue(Message message, Channel channel) throws Exception{byte[] bodymessage.getBody();String msgnew String(body, Charset.forName(UTF-8));log.info(当前时间:{},收到死信队列的消息:{},new Date().toString(),msg);} } //2.小结     //1推荐使用RMQ解决延迟队列问题 四.发布确认高级 //1.概述     //1RMQ重启期间生产者消息投递失败导致消息丢失需要手动处理和恢复。     //2交换机和队列有一个不在都会导致消息的丢失。     //3如果交换机收不到消息应该如何处理     //4生产者通过回调接口感知交换机是否接受消息成功。     -------确认 package com.day.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class ConfirmConfig {//交换机public static final String CONFIRM_EXCHANGE_NAMEconfirm_exchange;//队列public static final String CONFIRM_QUEUE_NAMEconfirm_queue;//routingKeypublic static final String CONFIRM_ROUTING_KEYkey1;//声明交换机Beanpublic DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}//队列Beanpublic Queue confirmQueue(){return new Queue(CONFIRM_QUEUE_NAME);}//绑定Beanpublic Binding queueBindingExchange(Qualifier(confirmQueue)Queue confirmQueue,Qualifier(confirmExchange)DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);} } ------------回调配置 package com.day.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;import javax.annotation.PostConstruct; import javax.annotation.Resource;Slf4j Component public class MyCallBack implements RabbitTemplate.ConfirmCallback {Resourceprivate RabbitTemplate rabbitTemplate;//注入PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}//交换机确认回调方法//1.发消息 交换机接收到了 会回调//correlationData 保存回调消息的ID及相关信息// b true 交换机收到了消息// s 失败的原因,成功时为null//2.发消息 交换机接收失败了也会回调// b为falseOverridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String ID;if(correlationData!null){ID correlationData.getId();}else{ID;}if(ack){log.info(交换机已经收到消息,ID为{}的消息,ID);}else{log.info(交换机还未收到消息,ID为{}的消息,原因为{},ID,cause);}} } ------------yml文件配置 spring:#devtools:#restart:#enabledtrue: #支持热部署  可能导致重启然后非实时语音转写报错。rabbitmq:host: wdfgdzx.topport: 5672username: xlliu24password: s19911009!publisher-confirm-type: correlated ------------生产者 package com.day.controller; import com.day.config.ConfirmConfig; import com.day.config.DelayedQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 Slf4j RestController RequestMapping(/ttl) public class ProducerController implements Serializable{Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息GetMapping(/sendMsg/{message})public void sendMsg(PathVariable String message){//System.out.println(当前时间: new Date().toString()发送一条信息给两个TTL队列 message);log.info(当前时间:{},发送一条信息给两个TTL队列{},new Date(),message);rabbitTemplate.convertAndSend(X,XA,消息来自ttl为10s的队列: message);rabbitTemplate.convertAndSend(X,XB,消息来自ttl为40s的队列: message);}//开始发消息 消息 TTLGetMapping(/sendExpireMsg/{message}/{ttlTime})public void sendMsg(PathVariable String message,PathVariable String ttlTime){log.info(当前时间:{},发送一条时长{}ms信息给TTL队列QC{},new Date(),ttlTime,message);rabbitTemplate.convertAndSend(X,XC,message,msg -{msg.getMessageProperties().setExpiration(ttlTime);return msg;});}//开始发消息,基于插件的 消息及延迟的时间GetMapping(/sendDelayMsg/{message}/{delayTime})public void sendMsg(PathVariable String message,PathVariable int delayTime){log.info(当前时间:{},发送一条时长{}ms信息给延迟队列delayed.queue{},new Date(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -{msg.getMessageProperties().setDelay(delayTime);return msg;});}//开始发送消息 测试确认GetMapping(/sendMessage/{message})public void sendMessage(PathVariable String message){CorrelationData correlationDatanew CorrelationData(110161);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY123,message,correlationData);log.info(发送消息内容为:{},message);} } ----------消费者 package com.day.controller; import com.day.config.ConfirmConfig; import com.day.config.DelayedQueueConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 Slf4j Component public class Consumer implements Serializable {RabbitListener(queues ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMessage(Message message, Channel channel) throws Exception{byte[] bodymessage.getBody();String msgnew String(body, Charset.forName(UTF-8));log.info(当前时间:{},接收到的队列confirm.queue消息:{},new Date().toString(),msg);} } //2.回退消息     //1如果发现交换机和信道之间不可路由要通过设置Mandatory参数可以在不可送达时送回给生产者。  ------------生产者 package com.day.controller; import com.day.config.ConfirmConfig; import com.day.config.DelayedQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 Slf4j RestController RequestMapping(/ttl) public class ProducerController implements Serializable{Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息GetMapping(/sendMsg/{message})public void sendMsg(PathVariable String message){//System.out.println(当前时间: new Date().toString()发送一条信息给两个TTL队列 message);log.info(当前时间:{},发送一条信息给两个TTL队列{},new Date(),message);rabbitTemplate.convertAndSend(X,XA,消息来自ttl为10s的队列: message);rabbitTemplate.convertAndSend(X,XB,消息来自ttl为40s的队列: message);}//开始发消息 消息 TTLGetMapping(/sendExpireMsg/{message}/{ttlTime})public void sendMsg(PathVariable String message,PathVariable String ttlTime){log.info(当前时间:{},发送一条时长{}ms信息给TTL队列QC{},new Date(),ttlTime,message);rabbitTemplate.convertAndSend(X,XC,message,msg -{msg.getMessageProperties().setExpiration(ttlTime);return msg;});}//开始发消息,基于插件的 消息及延迟的时间GetMapping(/sendDelayMsg/{message}/{delayTime})public void sendMsg(PathVariable String message,PathVariable int delayTime){log.info(当前时间:{},发送一条时长{}ms信息给延迟队列delayed.queue{},new Date(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -{msg.getMessageProperties().setDelay(delayTime);return msg;});}//开始发送消息 测试确认GetMapping(/sendMessage/{message})public void sendMessage(PathVariable String message){CorrelationData correlationDatanew CorrelationData(110161);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY123,message,correlationData);log.info(发送消息内容为:{},message);} }    ------------回调配置 package com.day.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;import javax.annotation.PostConstruct; import javax.annotation.Resource;Slf4j Component public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {Resourceprivate RabbitTemplate rabbitTemplate;//注入PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}//交换机确认回调方法//1.发消息 交换机接收到了 会回调//correlationData 保存回调消息的ID及相关信息// b true 交换机收到了消息// s 失败的原因,成功时为null//2.发消息 交换机接收失败了也会回调// b为falseOverridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String ID;if(correlationData!null){ID correlationData.getId();}else{ID;}if(ack){log.info(交换机已经收到消息,ID为{}的消息,ID);}else{log.info(交换机还未收到消息,ID为{}的消息,原因为{},ID,cause);}}//在消息不可送达时,将消息返回给生产者,只有失败的时候才会回退Overridepublic void returnedMessage(Message message, int replyCode,String replyText, String exchange,String routingKey) {log.error(消息{},被交换机{}退回,退回原因:{},路由key:{},new String(message.getBody()),exchange,replyText,routingKey);} } ------------yml配置 spring:#devtools:#restart:#enabledtrue: #支持热部署  可能导致重启然后非实时语音转写报错。rabbitmq:host: wdfgdzx.topport: 5672username: xlliu24password: s19911009!publisher-confirm-type: correlatedpublisher-returns: true //3.备份交换机     //1先写交换机、路由、队列的绑定关系     //2再写消费者然后删除原有的确认交换机因为他会转发和之前的不同了     ------交换机之间关系配置 package com.day.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class ConfirmConfig {//交换机public static final String CONFIRM_EXCHANGE_NAMEconfirm_exchange;//队列public static final String CONFIRM_QUEUE_NAMEconfirm_queue;//routingKeypublic static final String CONFIRM_ROUTING_KEYkey1;//备份交换机public static final String BACKUP_EXCHANGE_NAMEbackup_exchange;//备份队列public static final String BACKUP_QUEUE_NAMEbackup_queue;//报警队列public static final String WARNING_QUEUE_NAMEwarning_queue;//声明交换机Beanpublic DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument(alternate-exchange,BACKUP_EXCHANGE_NAME).build();}//队列Beanpublic Queue confirmQueue(){return new Queue(CONFIRM_QUEUE_NAME);}//绑定Beanpublic Binding queueBindingExchange(Qualifier(confirmQueue)Queue confirmQueue,Qualifier(confirmExchange)DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}//备份交换机Beanpublic FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//队列Beanpublic Queue backupQueue(){return new Queue(BACKUP_QUEUE_NAME);}Beanpublic Queue warningQueue(){return new Queue(WARNING_QUEUE_NAME);}Beanpublic Binding backupQueueBindingBackupExchange(Qualifier(backupQueue)Queue backupQueue,Qualifier(backupExchange)FanoutExchange backupExchange){return BindingBuilder.bind(backupQueue).to(backupExchange);}Beanpublic Binding warningQueueBindingBackupExchange(Qualifier(warningQueue)Queue warningQueue,Qualifier(backupExchange)FanoutExchange backupExchange){return BindingBuilder.bind(warningQueue).to(backupExchange);} } -------报警消费者 package com.day.controller; import com.day.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.util.Date;//报警消费者 Slf4j Component public class WarningConsumer {RabbitListener(queues  ConfirmConfig.WARNING_QUEUE_NAME)public void receiveWarningMsg(Message message){String msgnew String(message.getBody());log.error(报警,发现不可路由消息:{},msg);log.info(当前时间:{},接收到的队列warning_queue消息:{},new Date().toString(),msg);} } //4.RMQ其他知识点     //1消息被重复消费重复扣了用户的钱。幂等性问题     //2幂等性问题的解决一般使用全局ID使用该ID判断该消息是否已消费过。     //3唯一ID指纹码或利用redis的原子性去实现     //4利用redis执行setnx命令天然具有幂等性从而实现不重复消费。 //5.优先级队列     //1订单催付场景 RMQ进行改造和优化对大客户的订单进行优先级的提升。     //2生产者先把消息发到队列之中然后消费者再消费。     ------交换机、队列、路由配置 package com.day.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map;Configuration public class CommonConfig {//交换机public static final String EXCHANGE_NAMEexchange;//队列public static final String QUEUE_NAMEqueue;//routingKeypublic static final String ROUTING_KEYkey;//声明交换机Beanpublic DirectExchange exchange(){return new DirectExchange(EXCHANGE_NAME);}//队列Beanpublic Queue queue(){MapString,Object argumentsnew HashMap();//队列arguments.put(x-max-priority,10);//官方允许是0-255 此处设置10 允许0-10 不用设置过大 浪费CUP和内存return QueueBuilder.durable(QUEUE_NAME).withArguments(arguments).build();}//绑定Beanpublic Binding queueBindingExchange(Qualifier(queue)Queue queue,Qualifier(exchange)DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);} } ------生产者 package com.day.controller; import com.day.config.CommonConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 Slf4j RestController RequestMapping(/RMQ) public class ProducerController implements Serializable{Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息GetMapping(/sendMessage/{message})public void sendMsg(PathVariable String message){for(int i1;i11;i){if(i5){rabbitTemplate.convertAndSend(CommonConfig.EXCHANGE_NAME,CommonConfig.ROUTING_KEY,生产者生产消息:messagei,msg - {msg.getMessageProperties().setPriority(5);return msg;});log.info(当前时间:{},发送一条信息给队列{},new Date(),messagei);}else{rabbitTemplate.convertAndSend(CommonConfig.EXCHANGE_NAME,CommonConfig.ROUTING_KEY,生产者生产消息:messagei);log.info(当前时间:{},发送一条信息给队列{},new Date(),messagei);}}} } ------消费者 package com.day.controller; import com.day.config.CommonConfig; import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 /* Slf4j Component public class Consumer implements Serializable {RabbitListener(queues CommonConfig.QUEUE_NAME)public void receiveConfirmMessage(Message message, Channel channel) throws Exception{byte[] bodymessage.getBody();String msgnew String(body, Charset.forName(UTF-8));log.info(当前时间:{},接收到的队列confirm.queue消息:{},new Date().toString(),msg);} } */ //接收消息 public class Consumer {//接收消息public static void main(String[] args) throws Exception {//创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();connectionFactory.setHost(47.105.174.97);connectionFactory.setUsername(xlliu24);connectionFactory.setPassword(s19911009!);//创建新链接Connection connectionconnectionFactory.newConnection();Channel channelconnection.createChannel();//声明 接受消息DeliverCallback deliverCallback(consumerTag, message) -{System.out.println(new String(message.getBody()));};//取消消费CancelCallback cancelCallback consumerTage -{System.out.println(消费消息被中断...);};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */channel.basicConsume(CommonConfig.QUEUE_NAME,true,deliverCallback,cancelCallback);} } //6.惰性队列     //1消息保存在内存中还是在磁盘上正常消息是保存在内存中。在惰性中消息是保存在磁盘中的。     //2当有消费者宕机、大量消息积压时才用惰性队列。 //五.集群 //1.集群原理     //1集群可以不断的扩充需要有3台机器     //2修改三台机器的RMQ名称分别为node1 node2 node3     //3vim /etc/hosts  IP 节点名称对应。三个机器都需要这么配置     //4远程复制命令 scp   /var/lib/rabbitmq/.erlang.cookie rootnode2:/var/lib/rabbitmq/.erlang.cookie     //5重启RMQ与erlang   rabbitmq-server -detached     //6节点二操作rabbitmqctl   stop_app ; rabbitmqctl reset ;rabbitmqctl join_cluster rabbitnode1;rabbitmqctl start_app   ;节点三操作相同     //7rabbitmqctl cluster_status 查看集群状态     //8rabbitmqctl add_user admin 123   ;rabbitmqctl set_user_tags admin administrator     //9rabbitmqctl set_permissions -p   / admin .* .* .*     //10登录后在Overview中可以看到节点     //11也可以解除集群节点 //2.镜像队列     //1如果有个节点宕机了重启发现消息丢失了镜像队列就是备份。     //2发给1节点但备份到2号节点。通过85集的设置可以实现。就算整个集群只剩下一台机器也可以处理。 //3.负载均衡     //1Haproxy实现负载均衡比如Nginx实现高并发高可用 //4.联邦交换机     //1异地机房网络延迟的问题北京ExchangeA  深圳ExchangeB,如果北京的用户访问深圳的RMQ怎么办     //2在每台机器上开启federation相关插件     //3rabbitmq-plugins   enable rabbitmq_faderation ;rabbitmaq-plugins enablerabbitmq_federation_management   ; 自带的插件,能看到 Federation Status ;Federation Upstarems     //4联邦队列 也可以同步数据交换机也可以实现。 //5.Shovel     //1铲子可以将数据从一端挖到另一端     //2rabbitmq-plugins   enable rabbitmq_shovelrabbitmq-plugins enable   rabbitmq_shovel_management     //3node1 q1 同步到node2 q2中  实现跨地区数据同步
http://www.zqtcl.cn/news/308429/

相关文章:

  • 想做一个网站平台怎么做公司网站建设费用估计
  • 电商网站开发平台pi netwo网页设计文件下载
  • 南平网站设计笔记本怎么建设网站
  • 舆情分析网站免费人工智能培训班收费标准
  • 青岛网站建设 大公司制作相册视频
  • 什么是网站的域名jquery素材网站
  • 课程网站建设ppt模板百度seo推广
  • 网站建设需要用到什么怎么在电脑上用手机app软件
  • 公司做网站有意义么网站认证必须做么
  • 网站虚拟空间更新缓存php外贸网站建设
  • 河南省建设执业资格注册中心网站门户定制网站建设公司
  • 网站开发比较厉害wordpress中文 插件
  • 文化投资的微网站怎么做个人微信公众号如何推广
  • 单位的网站怎样设计才美观网页设计图片的代码
  • 长沙专业做网站排名济南手机网站定制费用
  • 西安专题门户响应式网站建设系统网站有哪些
  • 山东省建设局网站监理员考试asp.net mvc6电商网站开发实践
  • 做网站需要提供什么资料网站备案是什么意思
  • 河南网站建设及推广东莞百度代做网站联系方式
  • 大型企业网站制作浦东新区做网站
  • 简单大气网站源码织梦怎么用框架实现在浏览器的地址栏只显示网站的域名而不显示出文件名
  • 电子商务型网站建设线上推广营销策划
  • 网站建设管理工作情况的通报网站开发vs设计报告
  • 嘉定网站网站建设公司官网制作
  • 做旅游广告在哪个网站做效果好财经网站建设
  • 网站样式下载网站地图定位用什么技术做
  • 自己做网站怎么做的百中搜优化软件
  • 南宁建站平台与网站建设相关的论文题目
  • 足球网站建设意义做股权众筹的网站
  • 北京网站建设设计一流的锦州网站建设