微网站建设价格,creative wordpress,电商运营发展前景怎么样,网站建设企业蛋糕初识 RabbitMQ
核心思想#xff1a;接收并转发消息。可以把它想象成一个邮局。 producer#xff1a;生产者 queue#xff1a;队列 consumer#xff1a;消费者什么是消息队列 MQ#xff08;Message Queue#xff09;#xff1a;本质是队列#xff0c;FIFO先入先出接收并转发消息。可以把它想象成一个邮局。 producer生产者 queue队列 consumer消费者什么是消息队列 MQMessage Queue本质是队列FIFO先入先出只不过队列中存放的内容是 message 而已还是一种跨进程的通信机制用于上下游传递消息。在互联网架构中MQ是一种非常常见的上下游“逻辑解耦物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ不用依赖其他服务。 消息队列起到的作用主要为允许入列、存储消息及消息出列。 消息队列的特性 业务无关FIFO先进先出容灾可以把消息进行持久化即便未发出去或发送消息的过程突然断电、服务宕机等恢复服务后把之前的消息重新发送保证了可靠性。性能消息队列的吞吐量上去之后整个系统内部通信效率也会提高。 为什么要使用消息队列 系统解耦当系统A发送一个消息到多个系统中间使用消息队列后不用每个系统都调用一遍减少处理时间提高维护性。异步调用可以通过消息异步调用多个业务模块大大提升整体程序的运行速度。流量削峰当系统出现流量高峰时可以把请求放到消息队列里再以服务器能处理的速度发给各个系统从而减少不必要的物理资源扩充。 MQ的分类 ActiveMQ 优点单击吞吐量万级时效性 ms 级可用性高基于主从架构实现高可用性较低概率丢失数据。缺点官方社区现在对 ActiveMQ 5.x 维护越来越少高吞吐量场景较少使用。 Kafka 大数据的杀手锏谈到大数据领域内的消息传输则绕不开 Kafka为大数据而生的消息中间件支持百万级 TPS 的吞吐量在数据采集、传输、存储的过程中发挥着举足轻重的作用。优点性能卓越单击写入 TPS 约在百万条/秒最大的优点就是吞吐量高。时效性 ms 级可用性非常高。kafka 是分布式的一个数据多个副本少量机器宕机不会丢失数据不会导致不可用。消费者采用 pull 方式获取消息消息有序通过控制能够保证所有消息被消费且仅被消费一次。有优秀的第三方Kafka Web 管理界面 Kafka-Manager。在日志领域比较成熟被多家公司和多个开源项目使用。功能较为简单主要支持简单的 MQ 功能在大数据领域的实时计算以及日志采集被大规模使用。缺点Kafka 单击超过 64 个队列/分区 Load 会发生明显的飙高现象队列越多load 越高发送消息响应时间变长使用短轮询方式实时性能取决于轮询间隔时间消费失败不支持重试支持消息顺序但是一台代理宕机后就会产生消息乱序社区更新较慢。 RocketMQ RocketMQ 出自阿里巴巴的开源产品用 Java 语言实现在设计时参考了 Kafka并做出了自己的一些改进。被阿里巴巴吧广泛应用在订单交易充值流计算消息推送日志流式处理binglog分发等场景。优点单击吞吐量十万级可用性非常高分布式架构消息可以做到 0 丢失MQ功能较为完善扩展性好支持10亿级别的消息堆积不会因为堆积导致性能下降源码是Java可以自己阅读源码定制自己公司的 MQ。缺点支持的客户端语言不多目前是Java及C其中C不成熟社区活跃度一般没有在 MQ 核心中去实现 JMS 等接口有些系统要迁移需要修改大量代码。 RabbitMQ 2007年发布是一个在AMQPAdvanced Message Queuing Protocol 高级消息队列协议基础上完成的可复用的企业消息系统是当前最主流的消息中间件之一。优点由于 Erlang 语言的高并发特性性能较好吞吐量到万级MQ 功能比较完备健壮、稳定、易用、跨平台支持多种语言如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等。支持AJAX文档齐全开源提供的管理界面非常棒用起来很好用社区活跃度高更新频率相当高。缺点商业版需要收费学习成本较高。 MQ 的选择 Kafka大量数据的互联网服务的数据收集业务有日志采集功能首选 Kafka。RocketMQ金融互联网领域对于可靠性要求很高的场景尤其是电商里面的订单扣款以及业务削峰等。RoketMQ 在稳定性上更值得信赖在阿里双11经历了多次考验。RabbitMQ结合 Erlang语言本身的并发优势性能好时效性微秒级社区活跃度也比较高管理界面用起来十分方便。如果数据量没那么大中小型公司优先选择功能比较完备的 RabbitMQ。 RabbitMQ 核心概念 四大核心概念生产者 Publisher、交换机 Exchage、队列 Queue、消费者Consumer。 Broker接受和分发消息的应用RabbitMQ Server就是 Message Brokerconnectionpublisher / consumer 与 broker 之间的 TCP 连接Channel信道在connection 内部建立的逻辑连接每个connection可以包含多个信道每次发消息只会占用一个信道。几乎所有的操作都在信道上进行。Channel 极大减少了操作系统建立 TCP connection 的开销。message消息由 properties 和 body 组成properties 为对消息进行额外的修饰包括优先级、队列延迟、消息其他属性等body 为消息实体。Virtual host虚拟主机顶层隔离。出于多租户和安全因素设计的在Broker里可以包含多个 Virtual host 而一个 Virtual host 里又可以包含多个 Exchange 和 Queue。 同一个虚拟主机下不能有重复的交换机和 queue。Exchange交换机接收生产者的消息的然后根据指定的路由器去把消息转发到所绑定的队列上。binding绑定交换机和队列routing key路由键路由规则虚拟机可以用它来确定这个消息如何进行一个路由。queue队列消费者只需要监听队列来消费消息不需要关注消息来自哪个ExchangeExchange 和 Message Queue 存在着绑定的关系一个 Exchange 可以绑定多个消息对类。 RabbitMQ 六大模式 消息流转过程
RabbitMQ 的安装和启动
下载安装包 先通过 uname -a 命令查询 Linux 版本然后下载对应的安装包 然后根据 Linux 版本在 PackageCloud 查看RabbitMQ与Erlang 支持的版本及官网https://www.rabbitmq.com/查看RabbitMQ与Erlang对应版本 结合上面选择并下载RabbitMQ 3.10.0 及 Erlang 23.3.x 版本。 下载完后通过如 SecureFX 文件上传工具上传到 Linux 服务器上 安装 安装 Erlang rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm 安装 RabbitMQyum命令需要Linux服务器联网 yum install rabbitmq-server-3.10.0-1.el7.noarch.rpm 常用命令 添加开机启动 chkconfig rabbitmq-server on 启动服务 service rabbitmq-server start 查看服务状态 service rabbitmq-server status 如下图如果是active (running)则启动完成activating 为正在启动中稍微等待重新查看服务状态 停止服务 service rabbitmq-server stop 开启 web 管理插件需先停止服务 rabbitmq-plugins enable rabbitmq_management 如出现如下错误 将主机名配置修改和上图显式的名称vim /etc/hosts 然后再执行成功后启动服务 防火墙开发15672端口 firewall-cmd --zonepublic --permanent --add-port15672/tcp firewall-cmd --reload 浏览器访问 添加新用户 创建账号 rabbitmqctl add_user admin 123456 设置用户角色 rabbitmqctl set_user_tags admin administrator 设置用户权限 set_permissions [-p ] rabbitmqctl set_permissions -p “/” admin “.*” “.*” “.*” 用户admin具有 “/” 这个 virtual host 中所有资源的配置、写、读权限 当前用户和角色 rabbitmqctl list_users
Hello World
简单队列模式
打开IDEA创建一个空项目 然创建一个Module 在pom.xml文件添加依赖 !--指定jdk编译版本--buildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins/builddependencies!--rabbitmq依赖客户端--dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.8.0/version/dependency!--操作文件流的一个依赖--dependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion2.6/version/dependency/dependencies消息生产者
package com.ql.rabbitmq.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者发消息*/
public class Producer {//队列名称public static final String QUEUE_NAME hello;//发消息public static void main(String[] args) throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();//工厂IP 连接RabbitMQ的队列factory.setHost(192.168.1.130);//factory.setPort(15672);//用户名factory.setUsername(admin);//密码factory.setPassword(123456);//创建连接Connection connection factory.newConnection();//获取信道Channel channel connection.createChannel();/*** 生成队列* 1.队列名称* 2.队列里面的消息是否持久化磁盘 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行消息共享true可以多个消费者消费 false:只能一个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发消息String message hello world;//初次使用/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的Key值是哪个 本次是队列的名称* 3.其他参数信息* 4.发送消息的消息体*/channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println(消息发送完毕);}}
运行测试注Linux放开5672端口看到已经有一条消息。
消息消费者
package com.ql.rabbitmq.one;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者 接收消息的*/
public class Consumer {//队列的名称public static final String QUEUE_NAME hello;//接收消息public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.1.130);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();//声明 接收消息DeliverCallback deliverCallback (consumerTag, message) - {System.out.println(new String(message.getBody()));};//取消消息时的回调CancelCallback cancelCallback consumerTag - {System.out.println(消息消费被中断);};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表的自动应答 false 代表手动应答* 3.消费者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}
Work Queues
工作队列模式又称任务队列的主要思想是当有资源密集型任务把任务封装为消息并将其发送到队列由一个或多个工作线程将任务轮询弹出并执行。
轮询分发消息
抽取连接工厂工具类
package com.ql.rabbitmq.utils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 此类为连接工厂创建信道的工具类*/
public class RabbitMqUtils {//得到一个连接的channelpublic static Channel getChannel() throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.1.130);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();return channel;}
}工作线程代码
package com.ql.rabbitmq.two;import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工作线程相当于之前消费者*/
public class Worker01 {//队列的名称public static final String QUEUE_NAME hello;//接受消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMqUtils.getChannel();DeliverCallback deliverCallback (consumerTag, message)-{System.out.println(接收到的消息new String(message.getBody()));};CancelCallback cancelCallback (consumerTag)-{System.out.println(consumerTag消费者取消消费接口的回调);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}启动2个工作线程 先启动一个 然后IDEA运行配置里开启允许多个实例再运行 生产者代码
package com.ql.rabbitmq.two;import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;/*** 生产者 发送大量的消息*/
public class Task01 {//队列的名称public static final String QUEUE_NAME hello;public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMqUtils.getChannel();//队列的声明channel.queueDeclare(QUEUE_NAME, false, false, false, null);//从控制台当中接受信息Scanner scanner new Scanner(System.in);while (scanner.hasNext()){String message scanner.next();channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println(发送消息完成 message);}}
}运行测试
消息应答
消费者完成一个任务可能需要一段时间如果其中一个消费者处理一个长的任务并仅完成了部分后突然挂掉了会发生什么情况RabbitMQ 一旦向消费者传递一条消息便立即将该消息标记为删除这种情况下突然有个消息挂掉了将会丢失正在处理的消息。 为了保证消息在发送过程中不丢失RabbitMQ 引入消息应答机制消息应答就是消费者在接收到消息并且处理该消息之后告诉 RabbitMQ 它已经处理了RabbitMQ 可以把该消息删除了。 自动应答 消息发送后立即被认为已经发送成功这种模式需要在高吞吐量和数据传输安全性方面做权衡。因为这种模式如果消息在接收到之前消费者出现连接或 channel 关闭消息就会丢失。另一方面这种模式消费者没有对传递的信息数量进行限制这样有可能发生消息的积压导致内存溢出等。所以这种模式仅适合在消费者可以高效并以某种速率能够处理这些消息的情况下使用。 手动应答 Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息可以将其丢弃了Channel.basicNack(用于否定确认)Channel.basicRejeck(用于否定确认) 与 Channel.basicNack 少一个参数 不处理该消息了直接拒绝可以将其丢弃 Multiple 批量应答 手动应答的好处是可以批量应答并且减少网络拥堵 Channel.basicAck(deliveryTag, Multiple) Multiple true 代表批量应答 channel 上未应答的消息。比如说 channel 上有传送tag的消息 5、6、7、8 当前 tag 是 8 那么此时 5-8 的消息都会被确认收到消息应答 false 不批量应答强烈推荐。上面情形只会应答 8 的消息。 消息自动重新入队 如果消费者由于某些原因失去连接其通道已关闭、连接已关闭或TCP 连接丢失导致消息未发送ACK 确认RabbitMQ 将了解到消息未完全处理并将对其重新排队。如果此时其他消费者可以处理它将很快将其重新分发给另一个消费者。这样即使某个消费者偶尔死亡也可以确保不会丢失任何消息。 消息手动应答代码
生产者Task2
package com.ql.rabbitmq.three;import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;/*** 消息在手动应答时不丢失、放回队列中重新消费*/
public class Task2 {//队列名称public static final String TASK_QUEUE_NAME ack_queue;public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMqUtils.getChannel();//声明队列channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);//从控制台中输入信息Scanner scanner new Scanner(System.in);while (scanner.hasNext()){String message scanner.next();channel.basicPublish(, TASK_QUEUE_NAME, null, message.getBytes(UTF-8));System.out.println(生产者发出消息message);}}
}消费者Work03
package com.ql.rabbitmq.three;import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.ql.rabbitmq.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消息在手动应答时不丢失放回队列重新消费*/
public class Work03 {//队列名称public static final String TASK_QUEUE_NAME ack_queue;//接收消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMqUtils.getChannel();System.out.println(C1等待接收消息处理时间较短);DeliverCallback deliverCallback (consumerTag , message)-{//沉睡1SSleepUtils.sleep(1);System.out.println(接收到的消息new String(message.getBody(), UTF-8));//手动应答/*** 1.消息的标记 tag* 2.是否批量应答*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//采用手动应答boolean autoAck false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag -{System.out.println(consumerTag消费者取消消费接口回调);}));}
}消费者Work04
package com.ql.rabbitmq.three;import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.ql.rabbitmq.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消息在手动应答时不丢失放回队列重新消费*/
public class Work04 {//队列名称public static final String TASK_QUEUE_NAME ack_queue;//接收消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMqUtils.getChannel();System.out.println(C2等待接收消息处理时间较长);DeliverCallback deliverCallback (consumerTag , message)-{//沉睡30SSleepUtils.sleep(30);System.out.println(接收到的消息new String(message.getBody(), UTF-8));//手动应答/*** 1.消息的标记 tag* 2.是否批量应答*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//采用手动应答boolean autoAck false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag -{System.out.println(consumerTag消费者取消消费接口回调);}));}
}睡眠工具类
public class SleepUtils {public static void sleep(int second){try {Thread.sleep(1000*second);} catch (InterruptedException e) {e.printStackTrace();}}
}手动应答效果演示 依次启动Task2、Work03、Work04然后Task2发送两个消息 此时Work03 已收到第一条消息 Work04 收到消息后沉睡30s之后才打印并手动应答我们在应答前停止Work04 停止Work04 后第二条消息就会被Work03 重新消费
RabbitMQ 持久化
默认情况下 RabbitMQ 退出或由于某种原因崩溃时它忽视队列和消息。为了确保消息不会丢失我们需要将队列和消息都标记为持久化。
队列实现持久化 上面创建的队列都是非持久化的RabbitMQ 如果重启的话该队列就会被删除。如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为持久化 注如果之前声明的队列不是持久化的需要把原先队列先删除否则出现如下错误 在管理页面先删除队列重新启动Task2 刷新管理页面该队列就会多一个D 的持久化标识。 消息持久化 修改生产者代码 在发布消息时添加 MessageProperties.PERSISTENT_TEXT_PLAIN 属性 将消息标记为持久化并不能完全保证消息不丢失尽管它告诉 RabbitMQ 将消息保存到磁盘但是这里依然存在消息刚准备存储在磁盘的时候服务关闭等此时并没有真正写入磁盘。持久化保证并不强但是对于简单任务队列是足够了如果需要更强有力的持久化策略参考后面发布确认知识点。不公平分发 轮询分发的缺点当有两个消费者其中一个处理任务速度非常快另一个速度很慢这个时候还用轮询分发速度快的消费者大部分时间处于空闲状态而另一个却在一直在处理任务。 为了避免这种情况我们可以在消费者接收消息的时候设置参数 channel.basicQos(1) 实现不公平分发。如果设置为0 则轮询分发 设置完Work03、Work04 后重启测试 预取值 RabbitMQ的信道上肯定不止只有一个消息因此这里就存在一个未确认的消息缓冲区因此希望开发人员能限制此缓冲区的大小以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 channel.basicQos() 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量RabbitMQ 将停止在通道上传递更多消息除非至少有一个未处理的消息被确认。 上面的不公平分发相当于安把预取值设置成了1。 我把上面Work03 、Work04 信道预取值都改成5然后重启测试 发送多个消息 通过管理界面可看到信道里堆积的消息条数。