结合七牛云做视频网站,微信公众号网页制作教程,昆明网站建设平台,项目工程监理公司网站建设方案如何从零开始手写一个消息中间件#xff08;从宏观角度理解消息中间件的技术原理#xff09; 什么是消息中间件消息中间件的作用逐一拆解消息中间件的核心技术消息中间件核心技术总览IOBIONIOIO多路复用AIOIO多路复用详细分析selectpollepoll Java中的IO多路复用 协议序列化消… 如何从零开始手写一个消息中间件从宏观角度理解消息中间件的技术原理 什么是消息中间件消息中间件的作用逐一拆解消息中间件的核心技术消息中间件核心技术总览IOBIONIOIO多路复用AIOIO多路复用详细分析selectpollepoll Java中的IO多路复用 协议序列化消息的存储消息的读写随机写、顺序写内存映射、零拷贝普通读写函数内存映射mmap()sendfile() 服务注册与发现分区并行与消费者组rebalance机制主从复制消息定时清理 什么是消息中间件
消息中间件也就是俗称的MQ。消息中间件是一个在分布式环境下提供消息收发能力的服务消息生产者把消息发送到消息中间件然后消息中间件存储生产者发送的消息消息消费者请求消息中间件拉取消息拉取到后进行消费。通过消息中间件两个服务间可以异步的传递消息满足了服务间消息传递的需求的同时又避免了服务间的强依赖达到了异步和解耦的效果。 消息中间件的作用
首先消息中间件的第一个功能就是可以做到异步。如果我们一个接口的处理逻辑比较复杂调用链路比较长耗时比较久但是客户端又不需要马上得到响应结果那么我们可以先把请求相关的信息存到MQ然后再由后面的消费者去消费这个消息处理这个请求这样就能达到快速响应客户端的效果。把实时性要求不高的请求通过MQ来滞后请求的处理可以在一定程度上提升接口的性能。 消息中间件的第二个作用就是解耦合。一个服务接收到请求需要通知其他服务来进行相应的处理如果采用同步通知的方式的话就只能通过调用接口的方式假如后面又增加了几个服务也需要收到通知那么就只能通过改代码的方式增加服务调用这种方式是很不利于代码的维护的要频繁的修改代码然后重新打包部署非常的麻烦。但是如果我们通过MQ来实现异步的通知那么生产者只需要发送一次消息到MQ其他服务需要收到通知的服务则作为消费者去监听MQ对应的消息收到消息时进行相应的处理后续如果有其他服务也需要接收到通知不需要找生产者加代码而是通过监听MQ的就能收到同样的消息这样就不需要频繁的修改代码一定程度上提升了可维护性。 消息中间件还有一个作用就是削峰。假如我们的服务某一时刻接收大量的请求达到了一个非常高的峰值如果使用同步的方式来处理服务器很可能扛不住压力而挂掉。如果我们先把请求存到MQ快速响应客户端这样就能减轻服务器的压力。然后存到MQ的消息由消费者异步慢慢的处理这样就可以快速地削掉并发请求的峰值有效的缓解服务器的压力。 逐一拆解消息中间件的核心技术
消息中间件被应用在分布式环境中提供高性能、高可靠的消息收发服务。它通常不是一个单一的简单服务如果它是一个单一的服务首先就不满足高可靠一个服务挂了生产者和消费者间的通信就断了其次它也不是一个简单服务里面涉及到许多技术如果它是一个简单服务的话比如消息的收发都是简单的http请求然后数据存到数据库虽然这样也可以实现消息收发的功能但是这种消息中间件性能并不高只能用来玩一玩没用什么实际的作用因此正规的消息中间件它的内部组成都是非常复杂的。
消息中间件核心技术总览
首先消息要发送到MQ就要经过网络因此就会有网络IO所以IO是消息中间间需要考虑的一个技术点。
在网络间传递消息通常都会有固定的协议比如http协议dns协议不同协议有不同的报文格式使用http协议的话性能太低所以消息中间件还应该考虑实现自己的协议定义自己的报文格式。
然后我们的消息在内存中通常是一个对象如果要网络发送出去必须要进行序列化。
当消息中间件接收到消息后需要进行存储因此消息中间件要设计自己的消息存储格式。
消息中间件接收生产者发送来的消息然后消费者请求消息中间件拉取消息因此消息中间件要设计自己的消息读写策略一个高性能的MQ它的读写必须是高性能的。
消息中间件通常应该在分布式环境下它本身应该也是分布式的所以消息中间件的地址应该要支持动态的发现不能写死在生产者和消费者的配置文件里面因此消息中间件还要设计自己的服务注册与发现的机制。
如果消息中间件要提高自己的吞吐量的化必须支持消息分区并行消费消息分区通常伴随着消费者组一起出现因此消息中间件还要考虑如何实现分区并行以及消费者如果给消费者分配自己负载的分区也就是rebalance。
消息中间件还要考虑如何实现消息的可靠性存到消息中间件的消息应该尽量保证不丢失因此消息中间件还要考虑主从复制。
最后存到消息中间件中的消息不能无限堆积要定时清理所以消息中间件还要设计自己的消息定时清理的机制。 因此消息中间件涉及到的技术点就有以下几项
IO协议序列化消息的存储消息的读写服务注册与发现分区并行与消费者组rebalance机制主从复制消息定时清理
下面我们对它们进行详细的分析。
IO
消息中间件是在生产者和消费者之间提供消息收发能力的服务消息的传递涉及到网络IO。一款高性能的消息中间件它的底层使用的必然是高性能的IO模型。 IO模型有BIO、NIO、IO多路复用、AIO。 BIO
BIO就是传统的阻塞式IO在BIO的模式下如果服务器端调用Socket的accept()方法监听连接如果此时没有客户端连接当前线程会一直阻塞等待连接的到来当调用Socket的read()函数读取消息时如果此时没有消息当前线程会一直阻塞等待直到有消息到达拷贝到用户空间当前线程才会返回。 可以看到无论是等待连接建立、还是等待数据的到来都是阻塞式的等待性能是非常低的。一旦没有客户端连接或者客户端不发送数据服务器的当前线程就会一直阻塞住不干别的事情。
NIO
NIO相对于BIO做的优化就是当此时没有数据时当前线程不会阻塞而是马上返回当前有数据达到则会阻塞进行数据拷贝数据拷贝到用户空间后才会返回。因此当前线程需要不断的轮询检查是否有数据到达如果没有数据到达当前线程可以干点别的事情然后再次轮询如果有数据到达当前线程才会被阻塞因此性能比BIO要高。建立连接和读取数据一样也是类似的机制。 由于没有连接需要建立或者没有数据到达时当前线程不会阻塞因此性能较BIO是有所提高的。但是当前线程需要不停的轮询这样显得有点傻傻的不太灵活。而且这种轮询的机制完全可以由操作系统帮我们实现无需用户自己去编写代码因此就有了后面的IO多路复用。
IO多路复用
IO多路复用对NIO进行了优化当前线程不需要轮询而是向操作系统注册一个事件。事件有不同的类型比如连接建立事件数据读取事件等我们可以同时注册不同类型的事件。比如注册了数据读取事件当有数据到达时会触发这个事件通知当前线程去读取数据当前线程就可以调用socket.read()方法去读取数据此时当前线程会阻塞直到数据读取到用户线程。相当于是把NIO中用户线程轮询的操作移到了操作系统内核由操作系统代替用户线程去轮询。收到操作系统的通知一定是有数据达到可以读取因此在IO多路复用下线程的每一次读取都是有效的读取。 AIO
AIO是异步前面三种都是同步IO同步IO就是数据的读取需要当前线程完成当有数据到来时当前线程会主动的去搬运数据到用户空间。而异步IO则是数据的搬运工作不需要当前线程完成而是操作系统去完成数据搬运完成后再通知当前线程去处理性能是最高的。但是Linux的AIO实际上是基于IO多路复用做封装的性能没有比IO多路复用高多少而Windows系统则在真正意义上实现了AIO。 BIO性能太低一般不会使用。Linux对AIO没有很好的实现因此一般也不会使用AIO。在Linux操作系统上高性能的消息中间件的网络通信应该使用IO多路复用这种高性能的IO模型。因此我们下面对IO多路复用进行详细分析。
IO多路复用详细分析
IO多路复用就是一个线程同时监听多个socket文件描述符只要有任何一个socket有数据到达当前线程就会解阻塞然后可以有效的去读取数据。传统的IO是一个Socket对应一个线程而IO多路复用是多个Socket复用一个线程因此叫做IO多路复用。 在Linux操作系统下有三种类型的IO多路复用select、poll和epoll。
select
select的大概原理就是由操作系统内核去监听一个1024长度的文件描述符数组当某个文件描述符对应的socket有数据可读时会通知用户线程此时用户线程解阻塞但是用户线程并不知道者1024个文件描述符里哪些文件描述符对应的socket是有数据可读的因此用户线程还需要去遍历一下。 很明显这种类型的IO多路复用有两个缺点
数组长度有限只能监听1024个socket多了就不行了每次都要用户线程遍历性能不高
poll
poll相比与select做的优化就是使用了链表去存储需要监听的文件描述符而链表是长度不受限制的因此可以监听超过1024个socket但是当有文件描述符就绪时还是需要用户线程去遍历。 epoll
前面两种IO多路复用性能都不高原因在于每次都要用户线程去遍历一遍如果用户线程不需要遍历而是由操作系统直接返回就绪的文件描述符这样性能就会高很多因此epoll就对此做了优化。 使用epoll就不需要用户线程遍历而是直接通过socket读取数据。
epoll有三个系统调用函数分别是epoll_create、epoll_ctl、epoll_wait。epoll底层使用红黑树存储需要监听的socket的文件描述符epoll_craete函数的作用就是创建存储文件描述符的红黑树返回一个epoll实例。调用epoll_ctl函数可以将需要监听的socket对应的文件描述存储到红黑树中。当某个socket有数据到达时操作系统会将红黑树中对应的文件描述符复制到一个链表中调用epoll_wait就是阻塞等待某些socket有数据到达然后获取到操作系统返回的文件描述符链表该链表中所有的文件描述符对应的socket都是有数据达到的。 epoll是Linux系统里面性能最高的IO多路复用高性能的消息中间件底层一般使用epoll这种类型的IO多路复用进行网络通信。
Java中的IO多路复用
我们用Java代码如何实现IO多路复用呢其实Java中的NIO底层就是IO多路复用虽然也叫NIO但不是上面说的那个需要用户线程轮询的NIO而是底层使用了epoll的IO多路复用实现的IO机制。
Java NIO的三个核心对象Buffer、Channel、Selector。Channel封装了客户端与服务端之间的连接有ServerSocketChannel和SocketChannel两种类型分别与BIO的ServerSocket和Socket对应。 Selector就是IO多路复用器我们可以把Channel注册到Selector上并设置关注的事件类型可以注册多个Channel到Selector就相当于同时监听多个Socket。调用Selector的select方法当前线程会阻塞等待Selector监听的Channel有事件发生。当Selector监听的Channel有事件发生时当前线程会被唤醒然后就可以处理就绪的Channel用户线程需要把Channel中的数据copy到Buffer中然后再从Buffer中读取数据。 但是原生的Java NIO API使用过于复杂因此我们一般不会使用元素的Java NIO API而是使用NettyNetty对Java NIO API有良好的封装使用非常方便性能非常高。
协议
既然涉及到消息的传递那我们传递的消息用什么格式呢也就是我们的协议报文我们可以直接使用http协议报文去传递消息但是http协议的头部存在许多冗余数据这些头部信息其实我们根本用不到最重要的是http它不是二进制协议是一个明文字符串协议因此http协议报文的体积会相对较大会消耗过多的网络带宽性能并不高。 如果我们自己自定义二进制形式的报文的话效果就会不一样。我们报文中只定义有用的字段每一个二进制位都是有用的不存储无用的信息这样我们的报文格式就足够紧凑。并且我们的报文是二进制形式的所以不需要经过字符串转二进制这一步直接就可以在网络上进行传输因此占用的带宽就相对较小性能就比较高。 比如报文头部我们就存储序列化类型、报文体长度那么整个报文就只有头部两个字段和存储消息内容的报文体体积大大的缩小。
但是消息中间件除了要处理生产者发送消息的请求外还要处理消费者拉取消息的请求以及消费者发送ACK的请求等等所以光这两个头部字段还不够可能还要添加一个请求类型的头部那么头部就只需要存储三个字段。 因此我们可以自定义自己的协议报文使用二进制形式定义报文的格式报文的格式我们设计的足够紧凑只存我们需要的信息这样就能进一步提高消息中间件的性能。 序列化
定义好了协议之后协议体协议包含协议头和协议体存放的就是我们的消息内容但是我们的报文是二进制形式的而我们的消息内容通常是一个对象比如是一个Message对象那么我们就要把这个Message对象序列化成二进制的形式而不同的序列化机制它的性能有所差异此时我们就需要选用高性能的序列化机制。 常用的序列化机制有Java原生的序列化机制但是性能较低还有hessian、Protobuf等性能较高的序列化机制。除此以外我们可以把我们的消息对象转成Json格式字符串然后再将Json字符串通过UTF8编码成二进制也可以达到序列化的效果。 JDK自身提供的序列化机制存在两个问题1、序列化的数据比较大传输效率低2、其他语言无法识别。因此我们一般不使用JDK自带的序列化机制或者会把它作为一个备用的方案供使用者去选择。
使用Json进行序列化Json格式字符串编码后的二进制序列体积较大占用空间较大传输性能较低所以我们一般也不会把Json作为默认的序列化方案。但是使用Json序列化有它的好处那就是对编程语言没有要求并且不同编程语言的服务也能通过消息中间件进行通信用Java去发送的数据C或者C也能接收和解析并且它的可读性较高方便调试。
Hessian是一个支持跨语言传输的二进制序列化协议相对于Java默认的序列化机制来说Hessian支持多种不同的语言而且具有更好的性能和易用性。把Hessian作为默认的序列化机制是一个不错的选择。
Protobuf使用比较广泛性能比Hessian还要高。但是使用Protobuf相对来说比较麻烦没有Hessian易用。Protobuf规定每一个类都要编写对应的proto文件proto文件有自己的语法规则我们要按照Protobuf规定的语法规则编写proto文件然后使用Protobuf的编译器去编译proto文件。再把编译出来的类引入到我们的工程中使用。当需要改变类的内部结构比如有字段要增删时需要修改proto文件重新编译因此使用Protobuf会相对麻烦一些并且有一定的学习成本。
有了序列化机制我们的消息就能被序列化成二进制字节流在网络上进行传输。
消息的存储
消息发送到消息中间件消息中间接收到消息后就要对消息进行持久化。我们要设计出良好的消息存储格式因为消息存储格式的设计的好与坏直接决定了消息读写性能的高低如果只是一股脑的往后追加没有建立任何索引那么每次消息的读取都只能从头开始遍历性能肯定是不高的。 首先我们可以把发送到消息服务的消息数据按顺序的追加写入到一个log文件我们定义log文件的指定大小当一个log文件写满时就新开一个。 但如果仅仅设计成这样那我们读取消息的时候只能从头到尾逐条遍历。根据消费者消费偏移量offset与当前遍历到的消息的起始位置比较看是否是要读取的消息如果是要读取的消息则根据消息头部中的消息大小字段读取一定长度如果不是要读取的消息则根据消息头部中的消息大小字段跳过指定大小的空间读取下一条。这样性能是比较低的。 因此我们可以像数据库那样建立一个索引文件用于检索log文件中的数据。 这样当我们读取一条消息的时候我们可以通过二分查找在index文件中读到消息的偏移量再到log文件进行检索时间复杂度就降到了O(logN)性能就得到提升。
当然这只是其中一种方案还不是最优的方案。
消息的读写
定义好消息存储的格式之后就可以设计如何读写消息。消息读取的性能高低取决于读写数据使用的函数以及消息存储格式的好坏如果像数据库那样考虑了索引文件的建立那么消息的读取就可以做到O(logN)的时间复杂度否则每次读取都只能从头开始遍历。而消息的写入是写入到磁盘的磁盘的读写又分随机写和顺序写随机写的性能较低顺序写的性能较高。
随机写、顺序写
为什么随机写的性能较低而顺序写的性能较高呢 这是从网上随便扒来的一张描述磁盘组成的图。可以看到有一个磁臂带动磁头然后磁头滑到对应的磁道后磁头落下随着磁盘的转动读取磁道上数据。而这个磁臂带到磁头滑到对应的磁道这个动作叫做磁盘寻址这个动作是比较耗时间的。
随机写就是数据的写入是在磁盘上随机的几个位置上不是连续的因此一次随机写通常会有多次的磁盘寻址所以随机写的性能就相对较低。
而顺序写的意思就是顺序的往磁盘写入一段数据是在磁盘上的一段连续的空间写入因此只有一次的磁盘寻址开销因此性能较高。
在Java中使用NIO类库里面的MappedByteBuffer就可以实现顺序写。通过FileChannel.open(Path, OpenOption…) 方法得到对应文件的一个FileChannel然后调用FileChannel的map(MapMode mode, long position, long size)方法把文件中从指定位置position开始的大小为size的一段数据映射到内存中创建一个MappedByteBuffer对象与之对应最后调用MappedByteBuffer的put(byte[])把字节数组写入到MappedByteBuffer对应的内存映射区域写入到MappedByteBuffer对应的内存映射区域中的内容会被追加写入到对应文件的position位置后面。 内存映射、零拷贝
除此以外读写性能的高低还跟我们使用的函数有关比如我们使用的是普通的read()和write()函数那么性能是很低的它会经历多次无意义的拷贝。而如果我们使用内存映射mmap()或者零拷贝sendfile()之类的函数那么性能就相对较高。
普通读写函数
比如我们要从磁盘读取一段数据发送到网络。如果是普通的read()和write()函数先调用read()函数从用户态转到内核态然后从磁盘中读取数据到内核空间然后将内核空间的数据拷贝到用户空间然后从内核态切换为用户态然后在调用write()函数再次从用户态切换到内核态然后将用户空间的数据拷贝到内核空间的socket缓冲区然后再将socket缓冲区的数据拷贝到网卡然后从内核态切换回用户态完成整个数据拷贝的操作。可以看到这个操作是非常繁琐的伴随了四次用户态和内核态间的转换四次的数据拷贝。 内存映射mmap()
其实在数据没有修改的情况下完全没有必要把数据拷贝到用户空间因此read()函数那一次拷贝到用户空间的操作其实是多余的而通过内存映射mmap()函数就可以节省掉这一次内存数据拷贝的动作。
在调用mmap()函数时用户态切换到内核态从磁盘读取数据到内核空间然后建立内核空间与用户空间的映射无需拷贝数据到用户空间这两步操作做完后当前线程回到用户态然后再调用write()函数再次进入内核态把内存映射区域的数据拷贝到Socket缓存区然后把Socket缓存区中的数据拷贝到网卡。可以看到通过mmap()函数可以减少一次内存拷贝的操作而用户态与内核态间的切换次数没有减少。 sendfile()
如果使用sendfile()函数就可以进一步提升性能。调用sendfile()函数从用户到切换到内核态然后从磁盘读取数据到内核空间然后将数据在内核空间中的地址和长度发送到socket缓冲区然后网卡读取socket缓冲区中的信息得知数据所在的地址和长度直接从内核空间把数据拷走然后当前线程从内核态切换回用户态。可以看到用户态到与内核态间的切换节省为只有两次数据拷贝也是只有两次性能是非常高的。 因此作为高性能的消息中间件一定是使用mmap()或者sendfile()函数进行数据的读写。当然这些是C语言的函数使用Java语言的话也是有对应的API实现的。 服务注册与发现
这样似乎我们就可以使用这个消息中间件进行消息的传递了。但是目前这个消息中间件还是单体的我们可以直接在生产者和消费者上写死消息中间件的地址。
但是在分布式环境下单体服务是不具备高可用性的因此消息中间件往往会以集群的形式部署并且我们可能还允许消息服务动态上下线也就是按需加减机器此时我们就不能写死消息中间件的地址在生产者和消费者上了我们还需要一种类似于服务注册与发现的机制消息服务注册到注册中心然后生产者和消费者通过注册中心发现消息服务。这个注册中心可以使用Zookeeper也可以我们自己实现。 比如我们可以使用Zookeeper作为注册中心然后在Zookeeper上创建一个路径是/Brokers的节点然后每一台服务器上线就在/Brokers节点下创建一个临时节点临时节点的名称使用服务器的ip端口来命名。这样生成者和消费者监听/Brokers下的子节点即可。当有服务上线或下线时/Brokers节点下的临时子节点会发生变动利用Zookeeper的监听机制生产者和消费者都可以收到通知。 有了服务注册与发现的机制后我们的消息中间件就更加的灵活可以动态的上下线消息服务器也可以动态的发现上线的消息服务器的地址。 分区并行与消费者组rebalance机制
因为消息中间件是集群部署那么我们可以设计消息分区并行以提高吞吐量。每一个消息都属于一个Topic消息主题然后一个Topic会有多个Partition分区不同的Partition在不同的Broker节点消息服务上。我们的消息可以轮询发往多个Partition达到分区并行的效果。然后消费者以消费者组的形式部署多个消费者组成一个消费者组它们共同消费指定的Topic组内不同消费者消费不同Partition的消息这样就提高了消息消费的并行度提升了消息中间件的吞吐量。 但是这里有一个难题就是如果决定组内的不同消费者负责哪些Partition的消费也就是消费者组的rebalance机制我们要通过rebalance机制为一个消费者组内的消费者分配对应的分区。
假如我们设计的是组内的某个消费者执行rebalance
那么首先第一步就是要选出执行rebalance的消费者。然后该消费者从注册中心读取消息主题分区以及消费者组的信息因此消费者也要注册到注册中心然后消费者使用一定的算法比如轮询为组内不同消费者分配分区然后rebalance结果写入注册中心通知其他消费者读取并执行。 当然这只是其中一种rebalance机制的实现思路也可以考虑其他的实现方式来实现rebalance机制。 主从复制
目前我们一条消息只会发往一个消息服务所以消息不具备可靠性如果一台消息服务器上的消息丢失了那么该消息就真的丢失了。因此我们还要做消息的主从复制进行数据备份这样可以保证消息的可靠存储不会轻易发生消息丢失的情况。
主从复制又分为同步复制和异步复制。同步复制就是当收到生产者发送的消息时保证消息同步到从节点才响应生产者消息发送成功而异步复制只要消息在主节点上落盘成功就响应生产者消息发送成功同步消息到从节点这个动作则异步去做因此异步复制的消息可靠性就没有那么高但是性能相对同步复制要高。 消息定时清理
此时我们的消息中间件已经足够完备了但是我们还有最后一块工作要处理。我们的消息不能只做写入而不做清理磁盘空间是有限的我们不断的往磁盘写入数据而不去清理磁盘迟早会被写满。因此我们还要设计消息中间件的定时自动清理机制定时的清理过期的消息。
过期消息定时清理看起来很简单。但是我们存储在磁盘中的数据不仅仅有消息数据本身还有索引数据队列数据等对应的就是不同的文件在清理过期消息数据时要考虑把索引数据和队列数据一并清除保证它们的一致性。比如我们设计消息日志文件和消息索引文件是一一对应的那么在做过期消息日志文件清理的时候就可以很方便的把索引文件也一并清除。因此消息存储格式设计的好坏还决定了过期数据清理是简单还是复杂。 有了消息定期清理机制之后我们的磁盘空间就不会被占满消息中间件也可以持续的运行。