平顶山建设银行网站,网站做了301怎么查看跳转前网站,东营人力考试信息网官网,郑州住房和城乡建设部网站1. 前言 本文档旨在描述RocketMQ的多个关键特性的实现原理#xff0c;并对消息中间件遇到的各种问题进行总结#xff0c;阐述RocketMQ如何解决这些问题。文中主要引用了JMS规范与CORBA Notification规范#xff0c;规范为我们设计系统指明了方向#xff0c;但是仍有不少问题… 1. 前言 本文档旨在描述RocketMQ的多个关键特性的实现原理并对消息中间件遇到的各种问题进行总结阐述RocketMQ如何解决这些问题。文中主要引用了JMS规范与CORBA Notification规范规范为我们设计系统指明了方向但是仍有不少问题规范没有提及对于消息中间件又至关重要。RocketMQ并不遵循任何规范但是参考了各种规范与同类产品的设计思想。 产品发展历史 大约经历了三个主要版本迭代 一、MetaqMetamorphosis1.x 由开源社区killme2008维护开源社区非常活跃。https://github.com/killme2008/Metamorphosis 二、Metaq 2.x 于2012年10月份上线在淘宝内部被广泛使用。 三、RocketMQ 3.x 基于公司内部开源共建原则RocketMQ项目只维护核心功能且去除了所有其他运行时依赖核心功能最简化。 每个BU的个性化需求都在RocketMQ项目之上进行深度定制。RocketMQ向其他BU提供的仅仅是Jar包例如要定制一个Broker那么只需要依赖rocketmq-broker这个jar包即可可通过API进行交互如果定制client则依赖rocketmq-client这个jar包对其提供的api进行再封装。开源社区地址https://github.com/alibaba/RocketMQ在RocketMQ项目基础上衍生的项目如下 com.taobao.metaq v3.0 RocketMQ 淘宝个性化需求 为淘宝应用提供消息服务com.alipay.zpullmsg v1.0 RocketMQ 支付宝个性化需求 为支付宝应用提供消息服务com.alibaba.commonmq v1.0 Notify RocketMQ B2B个性化需求 为B2B应用提供消息服务3. 专业术语 Producer消息生产者负责产生消息一般由业务系统负责产生消息。Consumer消息消费者负责消费消息一般是后台系统负责异步消费。Push Consumer Consumer的一种应用通常向Consumer对象注册一个Listener接口一旦收到消息Consumer对象立刻回调Listener接口方法。Pull Consumer Consumer的一种应用通常主动调用Consumer的拉消息方法从Broker拉消息主动权由应用控制。ProducerGroup 一类Producer的集合名称这类Producer通常发送一类消息且发送逻辑一致。Consumer Group 一类Consumer的集合名称这类Consumer通常消费一类消息且消费逻辑一致。Broker消息中转角色负责存储消息转发消息一般也称为Server。 在JMS规范中称为Provider。广播消费 一条消息被多个Consumer消费即使这些Consumer属于同一个Consumer Group消息也会被Consumer Group中的每个Consumer都消费一次广播消费中的Consumer Group概念可以认为在消息划分方面无意义。 在CORBA Notification规范中消费方式都属于广播消费。 在JMS规范中相当于JMS publish/subscribe model集群消费 一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息其中一个Consumer Group有3个实例可能是3个进程或者3台机器那么每个实例只消费其中的3条消息。 在CORBA Notification规范中无此消费方式。 在JMS规范中JMS point-to-point model与之类似但是RocketMQ的集群消费功能大等于PTP模型。因为RocketMQ单个Consumer Group内的消费者类似于PTP但是一个Topic/Queue可以被多个Consumer Group消费。顺序消息 消费消息的顺序要同发送消息的顺序一致在RocketMQ中主要指的是局部顺序即一类消息为满足顺序性必须Producer单线程顺序发送且发送到同一个队列(这就是原理)这样Consumer就可以按照Producer发送的顺序去消费消息。普通顺序消息 顺序消息的一种正常情况下可以保证完全的顺序消息但是一旦发生通信异常 Broker重启由于队列总数发生变化哈希取模后定位的队列会变化产生短暂的消息顺序不一致。 如果业务能容忍在集群异常情况如某个Broker宕机或者重启下消息短暂的乱序使用普通顺序方式比较合适。严格顺序消息 顺序消息的一种 无论正常异常情况都能保证顺序但是牺牲了分布式Failover特性即Broker集群中只要有一台机器不可用则整个集群都不可用服务可用性大大降低。 如果服务器部署为同步双写模式此缺陷可通过备机自动切换为主避免不过仍然会存在几分钟的服务不可用。依赖同步双写主备自动切换自动切换功能目前还未实现目前已知的应用只有数据库binlog同步强依赖严格顺序消息其他应用绝大部分都可以容忍短暂乱序推荐使用普通的顺序消息。Message Queue 在RocketMQ中所有消息队列都是持久化长度无限的数据结构所谓长度无限是指 队列中的每个存储单元都是定长访问其中的存储单元使用Offset来访问offset为java long类型64位理论上在100年内不会溢出所以认为是长度无限 另外队列中 只保存最近几天的数据 之前的数据会按照过期时间来删除。也可以认为Message Queue是一个长度无限的数组offset就是下标。4. 消息中间件需要解决哪些问题 本节阐述消息中间件通常需要解决哪些问题在解决这些问题当中会遇到什么困难RocketMQ是否可以解决规范中如何定义这些问题。 4.1 Publish/Subscribe 发布订阅是消息中间件的最基本功能也是相对于传统RPC通信而言。在此不再详述。 4.2 Message Priority 规范中描述的优先级是指在一个消息队列中每条消息都有不同的优先级一般用整数来描述优先级高的消息先投递如果消息完全在一个内存队列中那么在投递前可以按照优先级排序令优先级高的先投递。由于RocketMQ所有消息都是持久化的所以如果按照优先级来排序开销会非常大因此RocketMQ没有特意支持消息优先级但是可以通过变通的方式实现类似功能 即单独配置一个优先级高的队列和一个普通优先级的队列将不同优先级发送到不同队列即可。 对于优先级问题可以归纳为2类 1)只要达到优先级目的即可不是严格意义上的优先级通常将优先级划分为高、中、低或者再多几个级别。每个优先级可以用不同的topic表示发消息时指定不同的topic来表示优先级这种方式可以解决绝大部分的优先级问题但是对业务的优先级精确性做了妥协。 2)严格的优先级优先级用整数表示例如0 ~ 65535这种优先级问题一般使用不同topic解决就非常不合适。如果要让MQ解决此问题会对MQ的性能造成非常大的影响。这里要确保一点业务上是否确实需要这种严格的优先级如果将优先级压缩成几个对业务的影响有多大 4.3 Message Order 消息有序指的是一类消息消费时能按照发送的顺序来消费。例如一个订单产生了3条消息分别是订单创建订单付款订单完成。消费时要按照这个顺序消费才能有意义。但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。原理是 4.4 Message Filter Broker端消息过滤 在Broker中按照Consumer的要求做过滤优点是减少了对于Consumer无用消息的网络传输。缺点是增加了Broker的负担实现相对复杂。 (1).淘宝Notify支持多种过滤方式包含直接按照消息类型过滤灵活的语法表达式过滤几乎可以满足最苛刻的过滤需求。 (2). 淘宝RocketMQ支持按照简单的Message Tag过滤也支持按照Message Header、body进行过滤。 (3).CORBA Notification规范中也支持灵活的语法表达式过滤。Consumer端消息过滤 这种过滤方式可由应用完全自定义实现但是缺点是很多无用的消息要传输到Consumer端。4.5 Message Persistence 消息中间件通常采用的几种持久化方式 (1).持久化到数据库例如Mysql。 (2).持久化到KV存储例如levelDB、伯克利DB等KV存储系统。 (3). 文件记录形式持久化例如KafkaRocketMQ (4).对内存数据做一个持久化镜像例如beanstalkdVisiNotify (1)、(2)、(3)三种持久化方式都具有将内存队列Buffer进行扩展的能力(4)只是一个内存的镜像作用是当Broker挂掉重启后仍然能将之前内存的数据恢复出来。 JMS与CORBA Notification规范没有明确说明如何持久化但是持久化部分的性能直接决定了整个消息中间件的性能。RocketMQ参考了Kafka的持久化方式充分利用Linux文件系统内存cache来提高性能。 4.6 Message Reliablity 影响消息可靠性的几种情况 (1).Broker正常关闭 (2).Broker异常Crash (3).OS Crash (4).机器掉电但是能立即恢复供电情况。 (5).机器无法开机可能是cpu、主板、内存等关键设备损坏 (6).磁盘设备损坏。(1)、(2)、(3)、(4)四种情况都属于硬件资源可立即恢复情况RocketMQ在这四种情况下能保证消息不丢或者丢失少量数据依赖刷盘方式是同步还是异步。 (5)、(6)属于单点故障且无法恢复一旦发生在此单点上的消息全部丢失。RocketMQ在这两种情况下通过异步复制可保证99%的消息不丢但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点同步双写势必会影响性能适合对消息可靠性要求极高的场合例如与Money相关的应用。 RocketMQ从3.0版本开始支持同步双写。 4.7 Low Latency Messaging 在消息不堆积情况下消息到达Broker后能立刻到达Consumer。RocketMQ使用长轮询Pull方式可保证消息非常实时消息实时性不低于Push。 4.8 At least Once 是指每个消息必须投递一次 RocketMQ Consumer 先 pull 消息到本地消费完成后才向服务器返回 ack如果没有消费一定不会 ack 消息 所以 RocketMQ 可以很好的支持此特性。 4.9 Exactly Only Once (1). 发送消息阶段不允许发送重复的消息。 (2). 消费消息阶段不允许消费重复的消息。 只有以上两个条件都满足情况下才能认为消息是“Exactly Only Once”而要实现以上两点在分布式系统环 境下不可避免要产生巨大的开销。所以 RocketMQ 为了追求高性能并不保证此特性要求在业务上进行去重 也就是说消费消息要做到幂等性。RocketMQ 虽然不能严格保证不重复但是正常情况下很少会出现重复发送、消费情况只有网络异常Consumer 启停等异常情况下会出现消息重复。 此问题的本质原因是网络调用存在不确定性即既不成功也不失败的第三种状态所以才产生了消息重复性问题。 4.10 Broker 的 Buffer 满了怎么办 Broker 的 Buffer 通常指的是 Broker 中一个队列的内存 Buffer 大小这类 Buffer 通常大小有限如果 Buffer 满 了以后怎么办 下面是 CORBA Notification 规范中处理方式 (1). RejectNewEvents 拒绝新来的消息向 Producer 返回 RejectNewEvents 错误码。 (2). 按照特定策略丢弃已有消息 a) AnyOrder - Any event may be discarded on overflow. This is the default setting for this property. b) FifoOrder - The first event received will be the first discarded. c) LifoOrder - The last event received will be the first discarded. d) PriorityOrder - Events should be discarded in priority order, such that lower priority RocketMQ 没有内存 Buffer 概念RocketMQ 的队列都是持久化磁盘数据定期清除。 对于此问题的解决思路RocketMQ 同其他 MQ 有非常显著的区别RocketMQ 的内存 Buffer 抽象成一个无限长度的队列不管有多少数据进来都能装得下这个无限是有前提的 Broker 会定期删除过期的数据例如 Broker 只保存 3 天的消息那么这个 Buffer 虽然长度无限但是 3 天前的数据会被从队尾删除。 4.11 回溯消费 回溯消费是指 Consumer 已经消费成功的消息由于业务上需求需要重新消费 要支持此功能Broker 在向 Consumer 投递成功消息后消息仍然需要保留。并且重新消费一般是按照时间维度例如由于 Consumer 系统故障 恢复后需要重新消费 1 小时前的数据那么 Broker 要提供一种机制可以按照时间维度来回退消费进度。 RocketMQ 支持按照时间回溯消费时间维度精确到毫秒可以向前回溯也可以向后回溯。 4.12 消息堆积 消息中间件的主要功能是异步解耦还有个重要功能是挡住前端的数据洪峰保证后端系统的稳定性 这就要求消息中间件具有一定的消息堆积能力消息堆积分以下两种情况 (1). 消息堆积在内存 Buffer一旦超过内存 Buffer可以根据一定的丢弃策略来丢弃消息如 CORBA Notification 规范中描述。适合能容忍丢弃消息的业务这种情况消息的堆积能力主要在于内存 Buffer 大小而且消息 堆积后性能下降不会太大因为内存中数据多少对于对外提供的访问能力影响有限。 (2). 消息堆积到持久化存储系统中例如 DBKV 存储文件记录形式。 当消息不能在内存 Cache 命中时要不可避免的访问磁盘会产生大量读 IO读 IO 的吞吐量直接决定了 消息堆积后的访问能力。 评估消息堆积能力主要有以下四点 (1). 消息能堆积多少条多少字节即消息的堆积容量。 (2). 消息堆积后发消息的吞吐量大小是否会受堆积影响 (3). 消息堆积后正常消费的 Consumer 是否会受影响 (4). 消息堆积后访问堆积在磁盘的消息时吞吐量有多大 4.13 分布式事务 已知的几个分布式事务规范如 XAJTA 等。其中 XA 规范被各大数据库厂商广泛支持如 OracleMysql 等。 其中 XA 的 TM 实现佼佼者如 Oracle Tuxedo在金融、电信等领域被广泛应用。 分布式事务涉及到两阶段提交问题在数据存储方面的方面必然需要 KV 存储的支持因为第二阶段的提交回滚需要修改消息状态一定涉及到根据 Key 去查找 Message 的动作。RocketMQ 在第二阶段绕过了根据 Key 去查找 Message 的问题 采用第一阶段发送 Prepared 消息时拿到了消息的 Offset第二阶段通过 Offset 去访问消息 并修改状态Offset 就是数据的地址。 RocketMQ 这种实现事务方式没有通过 KV 存储做而是通过 Offset 方式存在一个显著缺陷即通过 Offset 更改数据会令系统的脏页过多需要特别关注。 4.14 定时消息 定时消息是指消息发到 Broker 后不能立刻被 Consumer 消费要到特定的时间点或者等待特定的时间后才能 被消费。 如果要支持任意的时间精度在 Broker 层面必须要做消息排序 如果再涉及到持久化那么消息排序要不可避免的产生巨大性能开销。 RocketMQ 支持定时消息但是不支持任意时间精度支持特定的 level例如定时 5s10s1m 等。 4.15 消息重试 Consumer 消费消息失败后要提供一种重试机制令消息再消费一次。Consumer 消费消息失败通常可以认为 有以下几种情况 由于消息本身的原因例如反序列化失败消息数据本身无法处理例如话费充值当前消息的手机号被注销无法充值等。 这种错误通常需要跳过这条消息再消费其他消息而这条失败的消息即使立刻重试消费99%也不成功 所以最好提供一种定时重试机制即过 10s 秒后再重试。 由于依赖的下游应用服务不可用例如 db 连接不可用外系统网络不可达等。 遇到这种错误即使跳过当前失败的消息消费其他消息同样也会报错。这种情况建议应用 sleep 30s再消费下一条消息这样可以减轻 Broker 重试消息的压力。 5 RocketMQ Overview 5.1 RocketMQ 是什么 图5-1 是一个队列模型的消息中间件具有高性能、高可靠、高实时、分布式特点。Producer、Consumer、队列都可以分布式。Producer 向一些队列轮流发送消息队列集合称为 Topic Consumer 如果做广播消费则一个 consumer 实例消费这个 Topic 对应的所有队列如果做集群消费则多个 Consumer 实例平均消费这个 topic 对应的队列集合。能够保证严格的消息顺序提供丰富的消息拉取模式高效的订阅者水平扩展能力实时的消息订阅机制亿级消息堆积能力较少的依赖5.2 RocketMQ 物理部署结构 图5-2 RocketMQ 网络部署特点 Name Server 是一个几乎无状态节点可集群部署节点之间无任何信息同步。Broker 部署相对复杂Broker 分为 Master 与 Slave一个 Master 可以对应多个 Slave但是一个 Slave 只能 对应一个 MasterMaster 与 Slave 的对应关系通过指定相同的 BrokerName不同的 BrokerId 来定义BrokerId为 0 表示 Master非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节 点建立长连接定时注册 Topic 信息到所有 Name Server。Producer 与 Name Server 集群中的其中一个节点随机选择建立长连接定期从 Name Server 取 Topic 路由信息并向提供 Topic 服务的 Master 建立长连接且定时向 Master 发送心跳。 Producer 完全无状态可 集群部署。Consumer 与 Name Server 集群中的其中一个节点随机选择建立长连接定期从 Name Server 取 Topic 路由信息并向提供 Topic 服务的 Master、Slave 建立长连接且定时向 Master、Slave 发送心跳。 Consumer 既可以从 Master 订阅消息也可以从 Slave 订阅消息订阅规则由 Broker 配置决定。5.3 RocketMQ 逻辑部署结构 Producer Group用来表示一个发送消息应用一个 Producer Group 下包含多个 Producer 实例可以是多台机器也可以 是一台机器的多个进程或者一个进程的多个 Producer 对象。 一个 Producer Group 可以发送多个 Topic 消息Producer Group 作用如下 1.标识一类 Producer 2.可以通过运维工具查询这个发送消息应用下有多个 Producer 实例 3.发送分布式事务消息时如果 Producer 中途意外宕机Broker 会主动回调 Producer Group 内的任意一台机器来确认事务状态。 Consumer Group 用来表示一个消费消息应用一个 Consumer Group 下包含多个 Consumer 实例可以是多台机器也可 以是多个进程或者是一个进程的多个 Consumer 对象。一个 Consumer Group 下的多个 Consumer 以均摊 方式消费消息如果设置为广播方式那么这个 Consumer Group 下的每个实例都消费全量数据。 6 RocketMQ 存储特点 6.1 零拷贝原理 Consumer 消费消息过程使用了零拷贝零拷贝包含以下两种方式 使用 mmap write 方式 优点即使频繁调用使用小块文件传输效率也很高 缺点不能很好的利用 DMA 方式会比 sendfile 多消耗 CPU内存安全性控制复杂需要避免 JVM Crash 问题。 使用 sendfile 方式 优点可以利用 DMA 方式消耗 CPU 较少大块文件传输效率高无内存安全新问题。 缺点小块文件效率低于 mmap 方式只能是 BIO 方式传输不能使用 NIO。 RocketMQ 选择了第一种方式mmapwrite 方式因为有小块数据传输的需求效果会比 sendfile 更好。 关于 Zero Copy 的更详细介绍请参考以下文章 http://www.linuxjournal.com/article/6345 6.2 文件系统 RocketMQ 选择 Linux Ext4 文件系统原因如下 Ext4 文件系统删除 1G 大小的文件通常耗时小于 50ms而 Ext3 文件系统耗时约 1s 左右且删除文件时磁盘 IO 压力极大会导致 IO 写入超时。 文件系统层面需要做以下调优措施 文件系统 IO 调度算法需要调整为 deadline因为 deadline 算法在随机读情况下可以合并读请求为顺序跳跃 方式从而提高读 IO 吞吐量。 Ext4 文件系统有以下 Bug请注意 http://blog.donghao.org/2013/03/20/修复ext4日志jbd2bug/ 6.3 数据存储结构 图6-1 6.4 存储目录结构 6.5 数据可靠性 7 RocketMQ 关键特性 7.1 单机支持 1 万以上持久化队列 图7-1 (1). 所有数据单独存储到一个 Commit Log完全顺序写随机读。 (2). 对最终用户展现的队列实际只存储消息在 Commit Log 的位置信息并且串行方式刷盘。 这样做的好处如下 (1). 队列轻量化单个队列数据量非常少。 (2). 对磁盘的访问串行化避免磁盘竟争不会因为队列增加导致 IOWAIT 增高。 每个方案都有缺点它的缺点如下 (1). 写虽然完全是顺序写但是读却变成了完全的随机读。 (2). 读一条消息会先读 Consume Queue再读 Commit Log增加了开销。 (3). 要保证 Commit Log 与 Consume Queue 完全的一致增加了编程的复杂度。 以上缺点如何克服 (1). 随机读尽可能让读命中 PAGECACHE减少 IO 读操作所以内存越大越好。如果系统中堆积的消息过多 读数据要访问磁盘会不会由于随机读导致系统性能急剧下降答案是否定的。 a) 访问 PAGECACHE 时即使只访问 1k 的消息系统也会提前预读出更多数据在下次读时就可能命 中内存。 b) 随机访问 Commit Log 磁盘数据系统 IO 调度算法设置为 NOOP 方式会在一定程度上将完全的随机 读变成顺序跳跃方式而顺序跳跃方式读较完全的随机读性能会高 5 倍以上可参见以下针对各种 IO 方式的性能数据。 http://stblog.baidu-tech.com/?p851 另外 4k 的消息在完全随机访问情况下仍然可以达到 8K 次每秒以上的读性能。 (2). 由于 Consume Queue 存储数据量极少而且是顺序读在 PAGECACHE 预读作用下Consume Queue 的读性能几乎与内存一致即使堆积情况下。 所以可认为 Consume Queue 完全不会阻碍读性能。 (3). Commit Log 中存储了所有的元信息包含消息体类似于 Mysql、Oracle 的 redolog所以只要有 Commit Log 在Consume Queue 即使数据丢失仍然可以恢复出来。 7.2 刷盘策略 RocketMQ 的所有消息都是持久化的先写入系统 PAGECACHE然后刷盘 可以保证内存与磁盘都有一份数据 访问时直接从内存读取。 7.2.1 异步刷盘 图7-2-1 在有 RAID 卡SAS 15000 转磁盘测试顺序写文件速度可以达到 300M 每秒左右而线上的网卡一般都为千兆 网卡写磁盘速度明显快于数据网络入口速度 那么是否可以做到写完内存就向用户返回由后台线程刷盘呢 (1). 由于磁盘速度大于网卡速度那么刷盘的进度肯定可以跟上消息的写入速度。 (2). 万一由于此时系统压力过大可能堆积消息除了写入 IO还有读取 IO万一出现磁盘读取落后情况 会不会导致系统内存溢出答案是否定的原因如下 a) 写入消息到 PAGECACHE 时如果内存不足则尝试丢弃干净的 PAGE腾出内存供新消息使用策略 是 LRU 方式。 b) 如果干净页不足此时写入 PAGECACHE 会被阻塞系统尝试刷盘部分数据大约每次尝试 32 个 PAGE来找出更多干净 PAGE。 综上内存溢出的情况不会出现。 7.2.2 同步刷盘 图7-2-2同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PAGECACHE 直接返回而同步刷盘需要等待刷盘完成才返回 同步刷盘流程如下 (1). 写入 PAGECACHE 后线程等待通知刷盘线程刷盘。 (2). 刷盘线程刷盘后唤醒前端等待线程可能是一批线程。 (3). 前端等待线程向用户返回成功。 7.3 消息查询 7.3.1 按照 Message Id 查询消息 图7-2 MsgId 总共 16 字节包含消息存储主机地址消息 Commit Log offset。 从 MsgId 中解析出 Broker 的地址和 Commit Log 的偏移地址然后按照存储格式所在位置消息 buffer 解析成一个完整的消息。 7.3.2 按照 Message Key 查询消息 图7-3 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置slotNum 是一个索引文件里面包含的最大槽的数目 例如图中所示 slotNum5000000。 根据 slotValueslot 位置对应的值查找到索引项列表的最后一项倒序排列slotValue 总是指向最新的一个索引项。 遍历索引项列表返回查询时间范围内的结果集默认一次最大返回的 32 条记录 Hash 冲突寻找 key 的 slot 位置时相当于执行了两次散列函数一次 key 的 hash一次 key 的 hash 值取模 因此这里存在两次冲突的情况第一种key 的 hash 值不同但模数相同(不同key的hash值不同但模数相同)此时查询的时候会在比较一次 key 的 hash 值每个索引项保存了 key 的 hash 值过滤掉 hash 值不相等的项。第二种hash 值相等但 key 不等(不同key的hash值相同模数当然相同) 出于性能的考虑冲突的检测放到客户端处理key 的原始值是存储在消息文件中的避免对数据文件的解析 客户端比较一次消息体的 key 是否相同。 存储为了节省空间索引项中存储的时间是时间差值存储时间-开始时间开始时间存储在索引文件头中 整个索引文件是定长的结构也是固定的。索引文件存储结构参见图 7.4.3-3 。 7.4 服务器消息过滤 RocketMQ 的消息过滤方式有别于其他消息中间件是在订阅时再做过滤先来看下 Consume Queue 的存储结构。 图7-4 (1). 在Broker端进行Message Tag比对先遍历 Consume Queue如果存储的 Message Tag 与订阅的 Message Tag 不符合则跳过继续比对下一个符合则传输给 Consumer。 注意Message Tag 是字符串形式Consume Queue 中存储的是其对应的 hashcode比对时也是比对 hashcode。 (2). Consumer 收到过滤后的消息后同样也要执行在 Broker 端的操作但是比对的是真实的 Message Tag 字符串而不是 Hashcode。 为什么过滤要这样做 (1). Message Tag 存储 Hashcode是为了在 Consume Queue 定长方式存储节约空间。 (2). 过滤过程中不会访问 Commit Log 数据可以保证堆积情况下也能高效过滤。 (3). 即使存在 Hash 冲突也可以在 Consumer 端进行修正保证万无一失。 7.5 长轮询 Pull RocketMQ的Consumer都是从Broker拉消息来消费但是为了能做到实时收消息RocketMQ 使用长轮询方式可以保证消息实时性同 Push 方式一致。 这种长轮询方式类似于 Web QQ 收发消息机制。请参考以下信息了解 更多 http://www.ibm.com/developerworks/cn/web/wa-lo-comet/ 7.6 顺序消息 7.6.1 顺序消息原理 图7-6-1 在RocketMQ中主要指的是局部顺序即一类消息为满足顺序性必须Producer单线程顺序发送且发送到同一个队列(这就是原理) 这样Consumer就可以按照Producer发送的顺序去消费消息。 7.6.2 顺序消息缺陷 发送顺序消息无法利用集群 FailOver 特性 消费顺序消息的并行度依赖于队列数量 队列热点问题个别队列由于哈希不均导致消息过多消费速度跟不上产生消息堆积问题 遇到消息失败的消息无法跳过当前队列消费暂停 7.7 事务消息 图7-7-1 7.8 发送消息负载均衡 图7-5 如图所示5 个队列可以部署在一台机器上也可以分别部署在 5 台不同的机器上 发送消息通过轮询队列的方式 发送每个队列接收平均的消息量。通过增加机器可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。 7.9 订阅消息负载均衡 图7-6 如图所示如果有 5 个队列2 个 consumer那么第一个 Consumer 消费 3 个队列第二 consumer 消费 2 个队列。 这样即可达到平均消费的目的可以水平扩展 Consumer 来提高消费能力。 但是 Consumer 数量要小于等于队列数 量如果 Consumer 超过队列数量那么多余的 Consumer 将不能消费消息(不是队列内也可以并行)。 表1-1 7.10 单队列并行消费 图7-10单队列并行消费采用滑动窗口方式并行消费 如图所示3~7 的消息在一个滑动窗口区间可以有多个线程并行消 费但是每次提交的 Offset 都是最小 Offset例如 3 7.11 发送定时消息 7.12 消息消费失败定时重试 7.13 HA同步双写/异步复制 异步复制的实现思路非常简单Slave 启动一个线程不断从 Master 拉取 Commit Log 中的数据然后在异步 build 出 Consume Queue 数据结构。整个实现过程基本同 Mysql 主从同步类似。 7.14 单个 JVM 进程也能利用机器超大内存 图7-7 (1). Producer 发送消息消息从 socket 进入 java 堆。 (2). Producer 发送消息消息从 java 堆转入 PAGACACHE物理内存。 (3). Producer 发送消息由异步线程刷盘消息从 PAGECACHE 刷入磁盘。 (4). Consumer 拉消息正常消费消息直接从 PAGECACHE数据在物理内存转入 socket到达 consumer 不经过 java 堆。这种消费场景最多线上 96G 物理内存按照 1K 消息算可以在物理内存缓存 1 亿条消 息。 (5). Consumer 拉消息异常消费消息直接从 PAGECACHE数据在虚拟内存转入 socket。 (6). Consumer 拉消息异常消费由于 Socket 访问了虚拟内存产生缺页中断此时会产生磁盘 IO从磁 盘 Load 消息到 PAGECACHE然后直接从 socket 发出去。 (7). 同 5 一致。 (8). 同 6 一致。 7.15 消息堆积问题解决办法 前面提到衡量消息中间件堆积能力的几个指标现将 RocketMQ 的堆积能力整理如下 表7-1 在有 Slave 情况下Master 一旦发现 Consumer 访问堆积在磁盘的数据时会向 Consumer 下达一个重定向指 令令 Consumer 从 Slave 拉取数据这样正常的发消息与正常消费的 Consumer 都不会因为消息堆积受影响因为 系统将堆积场景与非堆积场景分割在了两个不同的节点处理。这里会产生另一个问题Slave 会不会写性能下降 答案是否定的。因为 Slave 的消息写入只追求吞吐量不追求实时性只要整体的吞吐量高就可以而 Slave 每次 都是从 Master 拉取一批数据如 1M这种批量顺序写入方式即使堆积情况整体吞吐量影响相对较小只是写入 RT 会变长。 8 RocketMQ 消息过滤 /*** 订阅指定topic下tags分别等于TagA或TagC或TagD
*/consumer.subscribe(TopicTest1, TagA || TagC || TagD);如以上代码所示简单消息过滤通过指定多个 Tag 来过滤消息过滤动作在服务器进行。实现原理参照第 7.4 节 8.2 高级消息过滤 图8-2 Broker 所在的机器会启动多个 FilterServer 过滤进程 Consumer 启动后会向 FilterServer 上传一个过滤的 Java 类 Consumer 从 FilterServer 拉消息FilterServer 将请求转发给 BrokerFilterServer 从 Broker 收到消息后按照 Consumer 上传的 Java 过滤程序做过滤过滤完成后返回给 Consumer。 总结 使用 CPU 资源来换取网卡流量资源 FilterServer 与 Broker 部署在同一台机器数据通过本地回环通信不走网卡 一台 Broker 部署多个 FilterServer充分利用 CPU 资源因为单个 Jvm 难以全面利用高配的物理机 Cpu 资源 因为过滤代码使用 Java 语言来编写应用几乎可以做任意形式的服务器端消息过滤例如通过 Message Header 进行过滤甚至可以按照 Message Body 进行过滤。 使用 Java 语言进行作为过滤表达式是一个双刃剑方便了应用的过滤操作但是带来了服务器端的安全风险。 需要应用来保证过滤代码安全例如在过滤程序里尽可能不做申请大内存创建线程等操作。避免 Broker 服 务器发生资源泄漏。 使用方式参见 Github 例子 https://github.com/alibaba/RocketMQ/blob/develop/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java 9 RocketMQ 通信组件 RocketMQ 通信组件使用了 Netty-4.0.9.Final在之上做了简单的协议封装。 10 RocketMQ 服务发现Name Server Name Server 是专为 RocketMQ 设计的轻量级名称服务代码小于 1000 行具有简单、可集群横向扩展、无状 态等特点。将要支持的主备自动切换功能会强依赖 Name Server。 参考http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/https://www.jianshu.com/p/453c6e7ff81chttp://valleylord.github.io/post/201607-mq-rocketmq/ 转载于:https://www.cnblogs.com/john8169/p/9780473.html