两学一做是什么网站,wordpress the id,太原做网站制作,acg wordpress模板一、什么是消息队列
在认识消息队列之前#xff0c;需要先知道 阻塞队列 是什么。
阻塞队列#xff1a;是一种特殊类型的队列#xff0c;它在队列为空时#xff0c;从队列中获取元素的操作将会被阻塞:当队列满时#xff0c;往队列中添加元素的操作将会被阻塞。同时#x…一、什么是消息队列
在认识消息队列之前需要先知道 阻塞队列 是什么。
阻塞队列是一种特殊类型的队列它在队列为空时从队列中获取元素的操作将会被阻塞:当队列满时往队列中添加元素的操作将会被阻塞。同时尝试往已满的阻塞队列中添加新的元素或从空队列中获取元素的线程同样也会被阻塞直到其他线程从队列中移除一个或者多个元素或者清空队列后使队列重新变得空闲起来。这种特性使得阻塞队列在多线程环境下能够有效地协调和同步线程操作
而所谓的 消息队列就是把阻塞队列这样的数据结构单独提取成了一个程序独立进行部署。
他们都可以用来实现 生产者消费者 模型不同之处在于
通过阻塞队列实现的生产者消费者模型只是在一个进程内部进行的而通过消息队列实现的生产者消费者模型可以在 进程与进程之间或者是 服务与服务之间而不仅仅局限于一个进程内部。生产者消费者模型的作用
解耦合写代码要求高内聚低耦合
比如本来有一个分布式系统A服务器调用B服务器。(A 给 B 发送请求B 给 A 返回响应)这个过程A 和 B 之间的 耦合是比较大的引入消息队列之后A 把请求发给消息队列 B 再从消息队列中获取请求~
削峰填谷比如 A 是入口服务器A再调用B完成一些具体的事务。
如果 A 和 B 直接通信此时若 A 突然收到一组用户请求的峰值此时 B 也会随之感受到峰值~~那么 B 可能会扛不住每个物理上的服务器硬件资源(比如CPU、内存、硬盘、网络宽带等) 都是有上限的引入消息队列之后A 把请求发给队列B从队列中取请求…
虽然 A 收到的请求很多队列收到的请求也不少但是 B 可以仍然按照原有的节奏来取请求不至于说一下就收到太多的并发量~~市面上一些知名的消息队列
RabbitMQKafkaRocketMQActiveMQ
二、需求分析
2.1 核心概念
消息队列里主要涉及到这几个名词
生产者Producer生产消息消费者Consumer消费消息中间人Broker生产者和消费者之间的桥梁生产者和消费者通过中间人进行交流发布Push 生产者向中间人这里投递消息的过程订阅Subscribe哪些消费者要从中间人取数据这个注册的过程称为 “订阅”消费 Consume 消费者从中间人这里取数据的动作这里要注意区分订阅和消费这两个概念
消费强调的是我取走的这个动作
订阅强调的是我告诉你 我要取走的这个过程
比如高中的时候会有一些学习报有一些同学会订购这个报纸把钱交给代理人代理人记录下来告诉你每月几号来他那领取这个过程就是订阅相当于我们平时说的预定而到约定的日期了我去领的这个动作成为消费生产者和消费者之间的关系可以是 一对一、一对多或者多对多
比如一个生产者一个消费者多个生产者多个消费者其中Broker 是最核心的地方。负责消息的存储和转发
2.2 Broker
刚刚我们也提到了Broker是最核心的地方而Broker Server 内部也涉及到一些关键概念~~虚拟主机Virtual Host
虚拟主机类似于 MySQL中的 database算是一个 “逻辑” 上的数据集合。一个 Broker server 上也可以组织多种不同类别数据就可以使用 Virtual Host 做出逻辑上的区分。实际开发中一个 Broker server 也可能同时用来管理多个 业务线(一个单独的模块) 上的数据就可以使用 Virtual Host 做出逻辑上的区分。交换机Exchange 生产者把消息投递给 Broker Server实际上是先把消息交给了 Broker Server 上的某个交换机再由交换机 把消息转发给对应的队列。队列Queue
队列是真正用来存储处理消息的实体 .后续消费者从对应的队列中取数据。
一个大的消息队列中可以有很多个具体的小的队列举个例子比如取快递有菜鸟驿站。我们的快递肯定是快递员先送到驿站然后我们再去驿站拿。这个过程交换机就相当于是快递员而驿站就相当于是队列绑定Binding
把交换机和队列之间建立起关联关系~~
可以把 交换机 和 队列 视为是 类似于 数据库 中的 “多对多” 这样的的关系~~
一个交换机可以对应到对个队列。也就是说一个交换机可以把消息分发给多个队列。一个队列也可被多个交换机对应。相当于是 这个队列可以收到来自不同交换机的数据
在数据库中表示多对多的关系会使用一个中间表/关联表~~
可以想象在 mq 中也有一个这样的中间表的。所谓的 “绑定” 其实就是中间表的一项。消息Message
具体来说可以认为是服务器 A 发给服务器 B 的请求通过 MQ 转发就是一个消息。
服务器 B 给 A 返回的响应通过 MQ 转发也是一个消息~~一个消息可以认为是一个字符串二进制数据
消息中具体包含啥样的数据都是程序员自己定义的~~下面用一张图来表示他们之间的关系这样就可以清晰的看到一个 Broker Server 中可以有多个虚拟主机一个虚拟主机中也可以有多个交换机和队列以及绑定。
上述这些概念都是 RabbitMQ 按照 AMQP 协议来组织的不是凭空捏造的。
2.3 核心 API
消息队列服务器Broker Server要提供的核心 API 有下面几个创建队列queueDeclare
此处不使用 Create 这样的术语而是使用 Declare。原因是 Create 就只是单纯的 “创建”而 Declare 起到的效果是不存在则创建存在就啥也不做了~~销毁队列queueDelete创建交换机exchangeDeclare销毁交换机exchageDelete创建绑定queueBind解除绑定queueUnbind发布消息basicPublish订阅消息basicConsume确认消息basicAck
这个 API 起到的效果是可以让消费者显式的告诉 broker server这个消息我处理完毕了提高整个系统的可靠性保证消息处理没有遗漏另外 tcp 里也有一个acktcp 中的确认应答ack是已读“已读未回”虽然收到了消息但是并不准备回复(只有已读的这个动作)mq 是 已处理“已读已回”对方收到消息了并且给你回复了一个处理信息对于 RabbitMQ 来说除了提供肯定的确认还提供了否定的确认~~这里 没有实现否认确定【注意】
对于 MQ 和消费者之间工作模式有两种
Push推 Broker把收到的数据主动地发送给订阅的消费者。RabbitMQ只支持 推 的方式~~Pull拉消费者主动调用 Broker 的 api 取数据~~ kafka就能支持拉
另外上述 API 也不是凭空捏造的这些 API 的名称以及用法都是参考了 RabbitMQ 的2.4 交换机类型
消息队列的交换机在转发消息的时候有一套转发的规则的~~
他们提供了几种不同的 交换机类型ExchangeType来描述不同的转发规则~~
RabbitMQ 主要实现了 四种 交换机类型也是 AMQP 协议定义的
Direct 直接交换机。Fanout 扇出交换机Topic 主题交换机Header 消息头交换机
这里我们只实现前三种第四种规则复杂应用场景比较少。
下面分别来看一下前三种交换机
Direct 直接交换机
生产者发送消息的时候会指定一个 “目标队列“ 的名字交换机收到之后就看看绑定的队列里有没有匹配的队列如果有转发过去把消息塞进对应的队列里如果没有消息直接丢弃~~
Fanout 扇出交换机
Fanout 交换机会把消息转发给所有与他绑定的队列。
比如要转发的消息是 “hello”这个交换机绑定着三个队列Topic 主题交换机
主题交换机里有两个关键概念
1bindingKey把队列和交换机绑定的时候指定一个单词像是个暗号一样
2routingKey生产者发送消息的时候也指定一个单词
如果当前 routingKey 和 bindingKey 能够对上暗号了此时就可以把这个消息转发给对应的队列中了
eg上述三种交换机类型,就像给 qq 群发红包一样~~专属红包. 我发的时候,必须指定某个人能领 直接交换机.我发 10 块钱红包,然后同时我开始做法~~ 群里的每个群友都能领到 10 块钱 扇出交换机画图红包~~ 我还是发 10 块红包,同时出个题: 必须要画一个桌子,画的好,画的像,才能领 主题交换机 (这里每个红包的大小还是10块)2.5 持久化
上述提到的 虚拟机、交换机、队列、绑定、消息都需要让 BrokerServer 组织管理~~
这些概念对应的数据需要存储和管理起来。此时内存和硬盘都会各自存储一份内存为主硬盘为辅。
在内存中存储的原因
对于 MQ 来说能够高效的转发处理数据是非常关键的指标 因此对于使用内存来组织数据得到的效率就比放硬盘要高很多
在硬盘中存储原因
为了防止内存中数据随着进程重启 / 主机重启而丢失。2.6 网络通信
其他的服务器生产者/消费者通过网络是要和咱们的 Broker Server 进行交互。
此处设定使用 TCP 自定义的应用层协议 实现生产者/消费者 和 BrokerServer 之间的交互工作~~
这里 自定义的应用层协议它的主要工作就是让客户端可以通过网络调用 broker server 提供的编程接口~~
接口刚刚提到的核心 API如下因此,在客户端这边,也需要提供对应的上述的这些方法。只不过服务器版本的上述方法,效果是真正干实事,把管理数据进行调整。客户端这边的上述方法,则只是发送请求/接收响应~~此处, 客户端调用了一个本地的方法,结果这个方法在背后,给服务器发了一系列消息,由服务器完成了一系列工作。站在调用者的角度,看到的只是说,当前的这个功能已经完成,并不知道这背后的细节.
虽然调用的是一个本地的方法,实际上就好像调用了一个远端服务器的方法一样~~ 这被称为 远程过程调用 (RPC)这个东西可以视为 编写 客户端服务器程序 通信过程 的一种设计思想客户端除了提供上述这9个和服务器这边对应的方法之外还需要在提供四个方法支持其他工作~~创建 Connection关闭 Connection创建 Channel关闭 Channel一个 Connection 里对象就代表一个 TCP 连接。 Channel,直译为 管道/通道。
一个 Connection 里面可以包含多个 Channel。每个Channel 上面传输的数据都是互不相干的~~
Channel 只是一个逻辑上的概念。因为 TCP 连接的建立和断开要消耗很多的资源因此引入 channel每个 channel 相当于一个传输特定数据类型的小连接当我们暂时不用的时候就可以断开这个 Channel用的时候在新建一个 Channel这样比操作 TCP 更轻量花销更小。
举个例子
他们之间的关系就像 吊瓶 (生病打手上的那种针)~~
比如打吊瓶要打三种药不肯能扎三次针只扎一次针换药瓶即可。
此处针就是 Connection药瓶就是 Channel。看上去一直是那根管实际上里面流的药不同2.7 消息应答
被消费的消息需要进行应答
应答模式分为两种
自动应答消费者只要消费了消息就算应答完毕了。Broker 直接删除这个消息。手动应答消费者手动调用应答接口Broker 收到应答请求之后才真正删除这个消息。⼿动应答的⽬的,是为了保证消息确实被消费者处理成功了.在⼀些对于数据可靠性要求⾼的场景,⽐ 较常⻅.2.8 总结
上面说了这么多那我们究竟是要做什么呢概括一下就是下面四点
需要实现 生产者broker server消费者 这三个部分。针对生产者和消费者来说主要编写的是客户端和服务器的网络通信部分。给客户端提供一组 api让客户端的业务代码来调用从而通过网络通信的方式远程调用 broker server 上的方法。【重点】 实现 broker server 以及 broekr server 内部的一些基本概念和核心 API持久化
上述的这些关键数据在硬盘中怎么存储啥格式存储存储在数据库中还是文件中
后续服务器重启了如何读取上述数据把内存中的内容回复过来
这些都是我们要考虑的问题上述这些工作的最终目标就是实现一个 “分布式系统下” 这样的生产者消费者模型~~
但是在当前情况下咱们的 broker server 并不支持分布式系统集群功能只有一个单机的 broker server能够给多个生产者消费者提供服务~~
但是人家专业的 mq比如 RabbitMQ、kafka 等这些都是支持集群的集群三、模块划分四、项目创建
这里创建SpringBoot项⽬.
使⽤SpringBoot2.7.17,Java8.
依赖引⼊SpringWeb和MyBatis(2.3.1).五、创建核心类
先根据 模块划分创建三个包创建服务器下的核心概念5.1 创建Exchange
Exchange 的整体结构如下public class Exchange {private String name;private ExchangeType type ExchangeType.DIRECT;private boolean durable false;private boolean autoDelete false;private MapString, Object arguments new HashMap();// 省略 getter setter
}public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type) {this.type type;}public int getType() {return this.type;}
}
name :交换机的名字.相当于交换机的⾝份标识.type :交换机的类型.三种取值,DIRECT,FANOUT,TOPIC.durable :交换机是否要持久化存储.true为持久化,false不持久化.autoDelete :使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤.arguments :交换机的其他参数属性.预留字段,暂时未使⽤.RabbitMQ中的交换机,⽀持autoDelete 和arguments ,咱们此处为了简单,暂时没有实现对 应功能,只是预留了字段,可以尝试⾃⼰完成.5.2 创建MSGQueuepublic class MSGQueue {private String name;private boolean durable;private boolean exclusive; private boolean autoDelete;private MapString, Object arguments new HashMap();// 省略 getter setter
}类名叫做MSGQueue,⽽不是Queue,是为了防⽌和标准库中的Queue混淆.name :队列的名字.相当于队列的⾝份标识.durable :交换机是否要持久化存储.true为持久化,false不持久化.exclusive :独占(排他),队列只能被⼀个消费者使⽤.autoDelete :使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤.arguments :交换机的其他参数属性.预留字段,暂时未使⽤.
5.3 创建Bindingpackage com.example.mq2.mqserver.core;/*** 这个类表示绑定* 交换机和队列进行绑定*/
public class Binding {private String exchangeName;private String queueName;// bindingKey 就是在出题要求领红包的人画个 “桌子” 出来~~private String bindingKey;// Binding 这个东西依附于 Exchange 和 Queue 的// 比如对于持久化来说如果 Exchange 和 Queue 任何一个没有持久化// 此时你针对 Binding 持久化是没有任何意义的// 省略getter和setter
}
exchangeName 交换机名字。queueName 队列名字bindingKey 只在交换机类型为TOPIC 时才有效.⽤于和消息中的routingKey 进⾏匹配。
5.4 创建Message此处的 Message是需要能够在网络上传输并且也需要能写入到文件中。
此时就需要针对 Message 进行序列化和反序列化 操作.
一个类只有实现了Serializable接口它的对象才能被序列化package com.example.mq2.mqserver.core;import java.util.UUID;/*** 表示一个要传递的消息* 注意 此处的 Message 对象是需要能够在网络上传输并且也需要能写入到文件中。* 此时就需要针对 Message 进行序列化和反序列化 操作*/
public class Message implements Serializable {// 消息的基本属性// 这里new一下防止后面快速获取时出现空指针异常private BasicProperties basicProperties new BasicProperties();// 消息的内容private byte[] body;/*** 下面的属性则是辅助用的属性* Message 后续会存储到文件中(如果持久化的话)* 一个文件中会存储很多的消息。如何找到某个消息在文件中的具体位置呢* 使用下列的两个偏移量来进行表示。[offsetBeg,offsetEnd)*/// 这俩属性并不需要被序列化保存到文件中~~ 此时消息一旦被写入到文件后所在的位置就固定了不需要单独存储// 这俩属性存在的目的主要就是为了让内存中的 Message 对象能够快速找到对应的硬盘上的 Message 的位置private transient long offsetBegin 0; // 消息数据的开头距离文件开头的位置偏移(字节)private transient long offsetEnd 0; // 消息数据的结尾距离文件开头的位置便宜(字节)// 使用这个属性表示消息在文件中是否是有效消息。(逻辑删除)// 0x1 表示有效0x0 表示无效private byte isValid 0x1;/*** 创建一个工厂方法让工厂方法帮我们封装一下创建 Message 对象的过程* 这方个法中创建 Message 对象会自动生成唯一的 MessageId* 万一 routingKey 和 basicMessageKey 里的 routingKey 冲突以外面的为主*/public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {Message message new Message();if (basicProperties ! null) {message.setBasicProperties(basicProperties);}message.setMessageId(M- UUID.randomUUID());message.setRoutingKey(routingKey);message.body body;// 此处是把 body 和 basicProperties 先设置出来他俩是 Message 的核心内容。// 而 offsetBegoffsetEndisValid则是消息持久化的时候才会用到在把消息写入文件之前再进行设定// 此处只是在内存中创建一个 Message 对象return message;}// 因为消息的属性都在 basicProperties 中这里写几个方法方便直接获取到public String getMessageId() {return basicProperties.getMessageId();}public void setMessageId(String messageId) {basicProperties.setMessageId(messageId);}public String getRoutingKey() {return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey) {basicProperties.setRoutingKey(routingKey);}public int getDeliverMode() {return basicProperties.getDeliveryMode();}public void setDeliverMode(int deliverMode) {basicProperties.setDeliveryMode(deliverMode);}public BasicProperties getBasicProperties() {return basicProperties;}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body body;}public long getOffsetBegin() {return offsetBegin;}public void setOffsetBegin(long offsetBegin) {this.offsetBegin offsetBegin;}public long getOffsetEnd() {return offsetEnd;}public void setOffsetEnd(long offsetEnd) {this.offsetEnd offsetEnd;}public byte getIsValid() {return isValid;}public void setIsValid(byte isValid) {this.isValid isValid;}
}
package com.example.mq2.mqserver.core;import java.io.Serializable;public class BasicProperties implements Serializable {// 消息的唯一身份标识。此处为了保证 id 的唯一性使用 UUID 来作为 message idprivate String messageId;// 是一个消息上带有的内容和 bindingKey 做匹配。// 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名// 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用)// 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配。符合要求的才能转发给对应队列private String routingKey;// 这个属性表示是否要持久化。1 表示不持久化2 表示持久化. (RabbitMQ 就是这么实现的)private int deliveryMode 1;// 其实对于 RabbitMQ 来说BasicProperties 里面还有很多别的属性。其他的属性暂时先不考虑了public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId messageId;}public String getRoutingKey() {return routingKey;}public void setRoutingKey(String routingKey) {this.routingKey routingKey;}public int getDeliveryMode() {return deliveryMode;}public void setDeliveryMode(int deliveryMode) {this.deliveryMode deliveryMode;}
}
六、数据库操作
6.1 SQLite 介绍
这里数据库使用的是 SQLite。SQLite是一个轻量级的关系型数据库运算速度快占用资源少。
这里咱们的MQ需要搭建 mqserver我们知名的MySQL是一个客户端服务器结构的程序本身就比较重量如果在给他配一个MySQL数据库就会很麻烦而且让整个环境变得复杂起来。因此这里使用 SQLite。
一个完整的 SQLite 数据库只有一个单独的可执行文件 (不到1M)SQLite只是一个本地的数据库这个数据库相当于是直接操纵本地的硬盘文件不涉及到网络层面更轻量更便捷。比如MySQL还需要create/drop database等而SQLite不需要你创建一个数据库文件就是一个数据库。
SQLite也是知名的服务器应用也非常广泛在一些性能不高的设备上是首选的数据库尤其是移动端和嵌入式设备(空调、冰箱、洗衣机等等)。Android 系统就是内置的 SQLite.6.2 Java中配置 SQLite
在Java中要想使用 SQLite都不需要额外安装。直接使用maven把 SQLite 的依赖给引入进来就可以了!!!
在中央仓库中找到并引入依赖!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc --
dependencygroupIdorg.xerial/groupIdartifactIdsqlite-jdbc/artifactIdversion3.41.0.1/version
/dependency
配置yml
spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC
mybatis:mapper-locations: classpath:mapper/**Mapper.xmlurl: jdbc:sqlite:./data/meta.db,会在当前项目所在的路径下创建一个 meta 的文件夹与src同级对于 SQLite来说并不需要指定用户名和密码
MySQL是一个客户端服务器结构的程序。一个数据库服务器就会对应多个客户端来访问它。SQLite 则不是客户端服务器结构的程序就只有自己一个人能访问把数据放在本地文件上和网络无关就只有本地主机才能访问SQLite 虽然和 MySQL不太一样但是都可以通过 MyBatis 这样的框架来使用6.3 建库建表
以往我们使用 MySQL的时候都是提前写好相应的建库建表代码需要使用的时候直接放到MySQL中执行整个操作都是在部署阶段完成的。
这里使用 SQLite希望能够自动完成上述操作因此就把相关代码也向插入删除等语句一样直接写到xml里。对于数据库SQLlite 启动时自动的创建而表则是用到的时候执行相应的代码
创建表
先在接口中声明在 xml 中实现
这里使用 update 标签来完成6.4 转化 arguments
在Java中arguments 是一个 HashMap 表而数据库中没有这个类型。因此就需要转化一下格式。
如何实现把 arguments 这个键值对, 和数据库中的字符串类型相互转换呢?
关键要点, 在于, MyBatis 在完成数据库操作的时候,会自动的调用到对象的 getter 和 setter.
比如 MyBatis 往数据库中写数据就会调用对象的 getter 方法,拿到属性的值再往数据库中写。如果这个过程中让 getArguments 得到的结果是 String 类型的此时就可以直接把这个数据写到数据库了。比如 MyBatis 从数据库读数据的时候就会调用对象的 setter 方法把数据库中读到的结果设置到对象的属性中。如果这个过程中让 setArguments,参数是一个 String,并且在 setArguments 内部针对字符串解析,解析成一个 Map 对象
更新 getter 和 setter
Exchange 类 和 MSGQueue 类都要更改
// 更改getter 和 setter与json格式转化public String getArguments() {// 是把当前的 arguments 参数从 Map 转成 StringJSONObjectMapper objectMapper new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}// 如果代码真的异常了返回一个空的 json 字符串就 okreturn {};}// 这个方法是从数据库读取数据之后构造 Exchange 对象会自动调用到public void setArguments(String argumentsJson) {// 把参数中的 argumentsJson 按照 JSON 格式解析转成 上述的 Map 对象ObjectMapper objectMapper new ObjectMapper();try {this.arguments objectMapper.readValue(argumentsJson, new TypeReferenceMapString,Object() {});} catch (JsonProcessingException e) {e.printStackTrace();}}6.5 实现数据库的插入和删除
接口
// 对上述三个概念的 插入、查找、删除void insertExchange(Exchange exchange);ListExchange selectAllExchanges();void deleteExchange(String exchangeName);void insertQueue(MSGQueue queue);ListMSGQueue selectAllQueues();void deleteQueue(String queueName);void insertBinding(Binding binding);ListBinding selectAllBindings();void deleteBinding(Binding binding);xmlinsert idinsertExchange parameterTypecom.example.mq.mqserver.core.Exchangeinsert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});/insertselect idselectAllExchanges resultTypecom.example.mq.mqserver.core.Exchangeselect * from exchange;/selectdelete iddeleteExchange parameterTypejava.lang.Stringdelete from exchange where name #{exchangeName};/deleteinsert idinsertQueue parameterTypecom.example.mq.mqserver.core.MSGQueueinsert into queue values(#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});/insertselect idselectAllQueues resultTypecom.example.mq.mqserver.core.MSGQueueselect *from queue;/selectdelete iddeleteQueue parameterTypejava.lang.Stringdelete from queue where name #{queueName};/deleteinsert idinsertBinding parameterTypecom.example.mq.mqserver.core.Bindinginsert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});/insertselect idselectAllBindings resultTypecom.example.mq.mqserver.core.Bindingselect * from binding;/selectdelete iddeleteBinding parameterTypecom.example.mq.mqserver.core.Bindingdelete from binding where exchangeName #{exchangeName} and queueName #{queueName};/delete这里Mybatis 框架会通过反射机制自动调用 参数 (eg: Exchange)中的getter方法获取对应属性的值然后将这些值插入到数据库中setter方法同理把返回值构造成相应的对象
对于 交换机, 和 队列 这两个表, 由于使用 name 作为主键, 直接按照 name 进行删除即可~
对于 绑定来说, 此时没有主键,删除操作,其实是针对 exchangeName 和 queueName 两个维度进行筛选,因此直接传入Binding对象6.6 实现DataBaseManager
文件位置
mqserver.datacenter.DataBaseManager1)创建DataBaseManager类
通过这个类来封装针对数据库的操作.
6.6.1 初始化
数据库的初始化 建库建表 插入一些默认数据
我们期望在咱们得 broker server 启动的时候做出下列逻辑判定如果数据库已经存在了,(表啥的都有了)不做任何操作如果数据库不存在, 则创建库,创建表,构造默认数据,例如,现在把 broker server 部署到一个新的服务器上显然,此时是没有数据库.就需要让 broker server 启动的时候自动的把对应的数据库创建好~~
但是如果是一个已经部署过的机器,broker server 重启了,就会发现,数据库已经有了,此时不做任何数据库相关操作.// 针对数据进行初始化
public void init() {if (!checkExists()) {// 数据库不存在就进行建库建表操作// 先创建一个 data 目录File dataDir new File(./data/meta.db);dataDir.mkdirs();// 创建数据表createTable();// 插入默认数据createDefaultData();System.out.println([DataBaseManager] 数据库初始化完成);} else {// 数据库已经存在了啥都不做即可System.out.println([DataBaseManager] 数据库已经存在了);}
}一般谈到初始化会想到构造方法但是这里 咱们自己实现一个。
构造方法一般是用来初始化类的属性~~一般不会涉及到太多的业务逻辑。此处的初始化带有业务逻辑的还是单独拎出来手动来调用比较合适一点。初始化中涉及到的方法private boolean checkExists() {File file new File(./data/meta.db);if (file.exists()) {return true;}return false;}private void createTable() {metaMapper.createExchangeTable();metaMapper.createQueueTable();metaMapper.createBindingTable();System.out.println([DataBaseManager] 创建表完成);}// 给数据表中添加默认的数据// 此处主要是添加一个默认的交换机// RabbitMQ 里有一个这样的设定带有一个 匿名 的交换机类型是 DIRECTprivate void createDefaultData() {// 构造一个默认的交换机Exchange exchange new Exchange();exchange.setName();exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);System.out.println([DataBaseManger] 创建数据库完成);}6.6.2 手动管理 MetaMapper
这里我们诸多操作都用到了 MetaMapper这个类是 spring 帮我们管理的(加了注解 Mapper)但是这里我们又不想让DataBaseManger被自动管理因此我们手动把 MetaMapper 构造出来。
更改启动类
SpringBootApplication
public class Mq2Application {public static ConfigurableApplicationContext context;public static void main(String[] args) {context SpringApplication.run(Mq2Application.class, args);}}ConfigurableApplicationContext 是 Spring 应用程序上下文的接口它代表了整个 Spring 容器。
在这里将 SpringApplication.run() 的返回值赋给了 context 变量这样可以在程序的其他地方使用这个上下文对象来获取 Spring 托管的 bean 等。修改init方法
在 init 方法首行手动获取 bean
public void init() {// 手动获取到 MetaMappermetaMapper Mq2Application.context.getBean(MetaMapper.class);
}6.6.3 封装其他数据库操作// 把其他的数据库的操作也在这个类中封装一下public void insertExchange(Exchange exchange) {metaMapper.insertExchange(exchange);}public ListExchange selectAllExchanges() {return metaMapper.selectAllExchanges();}public void deleteExchange(String exchangeName) {metaMapper.deleteExchange(exchangeName);}public void insertQueue(MSGQueue queue) {metaMapper.insertQueue(queue);}public ListMSGQueue selectAllQueues() {return metaMapper.selectAllQueues();}public void deleteQueue(String queueName) {metaMapper.deleteQueue(queueName);}public void insertBinding(Binding binding) {metaMapper.insertBinding(binding);}public ListBinding selectAllBindings() {return metaMapper.selectAllBindings();}public void deleteBinding(Binding binding) {metaMapper.deleteBinding(binding);}6.7 测试 DataBaseManger
现在对我们刚刚写好的DataBaseManger 进行单元测试确保它目前的正确性
设计单元测试要求 单元测试 用例和用例之间是需要相互独立的不能产生干扰。
比如6.7.1 初始化 和 收尾 方法
因此先写个 初始化 和 收尾 方法来处理后续的每个测试 (初始化 和 收尾 方法在每个单元测试方法执行前后都会执行)这样 就能保证每次执行一个 单元测试方法都是全新的环境
// 加上这个注解就会被识别为 单元测试类
SpringBootTest
public class DataBaseManagerTests {private DataBaseManager dataBaseManager new DataBaseManager();// 接下来下面这里需要编写多个 方法。每个方法都是一个/一组单元测试用例。// 还需要做一个准备工作。需要写两个方法分别用于进行 “准备工作” 和 “收尾工作”/*** 使用这个方法来执行准备工作每个测试用例执行前都要调用这个方法* 由于 init 中需要通过 context 对象拿到 metaMapper 实例* 所以就需要先把 context 对象给搞出来(重新获取 context对象让每个测试方法都是新的context从而保证不受前一个测试方法的影响)* p* 虽然在应用的正常运行中通常只需要获取一次应用上下文并且可以在整个应用生命周期内共享该上下文。* 但在测试中为了避免测试之间的相互影响重新获取应用上下文是一个常见的做法*/BeforeEachpublic void setUp() {Mq2Application.context SpringApplication.run(Mq2Application.class);dataBaseManager.init();}/*** 使用这个方法来执行收尾工作每个用例执行后都要调用这个方法* 这里要进行的操作就是把数据库给清空~~(把数据库文件meta.db 直接删了即可)* p* MqApplication.context.close()的目的是关闭应用上下文* 以确保在每个测试方法执行完毕后释放资源并清理状态。关闭应用上下文会销毁应用上下文中的所有bean并触发销毁回调如果有定义的话* p* 此处的 context 对象持有了 MetaMapper 的实例MetaMapper 的实例又打开了 meta.db 数据库文件* 如果 meta.db 被别人打开了此时的删除文件操作是不会成功的 (window 系统的限制Linux 则没有)* 另一方面获取 context 操作会占用 8080 端口此处的 close 也是释放 8080*/AfterEachpublic void tearDown() {Mq2Application.context.close();dataBaseManager.deleteDB();}
}
这里收尾的时候需要删除上次的数据库因此补充一个删除数据库的方法这个方法一般只在 测试的时候使用
在 DataBaseManager 中加入如下代码
// 删除数据库文件【测试中用到】public void deleteDB() {File file new File(./data/meta.db);boolean ret file.delete();if (ret) {System.out.println([DataBaseManger] 删除数据库文件成功);} else {System.out.println([DataBaseManger] 删除数据库文件失败);}File dataDir new File(./data);// 使用 delete 删除目录的时候需要保证目录是空的ret dataDir.delete();if (ret) {System.out.println([DataBaseManger] 删除数据库目录成功);} else {System.out.println([DataBaseManger] 删除数据库目录失败);}}6.7.2 测试 初始化Testpublic void testInitTable() {// 由于 init 方法, 已经在上面 setUp 中调用过了. 直接在测试用例代码中, 检查当前的数据库状态即可.// 直接从数据库中查询. 看数据是否符合预期.// 查交换机表, 里面应该有一个数据(匿名的 exchange); 查队列表, 没有数据; 查绑定表, 没有数据.ListExchange exchangeList dataBaseManager.selectAllExchanges();ListMSGQueue queueList dataBaseManager.selectAllQueues();ListBinding bindingList dataBaseManager.selectAllBindings();// 直接打印结果, 通过肉眼来检查结果, 固然也可以. 但是不优雅, 不方便.// 更好的办法是使用断言.// System.out.println(exchangeList.size());// assertEquals 判定结果是不是相等.// 注意这俩参数的顺序. 虽然比较相等, 谁在前谁在后, 无所谓.// 但是 assertEquals 的形参, 第一个形参叫做 expected (预期的), 第二个形参叫做 actual (实际的)Assertions.assertEquals(1, exchangeList.size());Assertions.assertEquals(, exchangeList.get(0).getName());Assertions.assertEquals(ExchangeType.DIRECT, exchangeList.get(0).getType());Assertions.assertEquals(0, queueList.size());Assertions.assertEquals(0, bindingList.size());}出现问题更改 初始化方法
加上创建 目录的方法更改后测试通过
6.7.3 测试 交换机
测试插入
之前 交换机 的 getter 和 setter 是和数据库进行交互的在方法内部已经进行了 序列化/反序列化。这里我们测试的时候需要根据交换机的属性来进行判断序列化之后不能很好的找到键值对因此这里再对 Exchange 写getter和setter方法来帮助我们进行测试。
Exchange 类中新增 getter 和 setter 方法
// 在这里针对 arguments再提供一组 getter setter用来去更方便的获取/设置这里的键值对// 这一组在 Java 代码内部使用(比如测试的时候)public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);}private Exchange createTestExchange(String exchangeName) {Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setAutoDelete(false);exchange.setDurable(true);exchange.setArguments(aaa, 1);exchange.setArguments(bbb, 2);return exchange;
}Test
public void testInsertExchange() {// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.Exchange exchange createTestExchange(testExchange);dataBaseManager.insertExchange(exchange);// 插入完毕之后, 查询结果ListExchange exchangeList dataBaseManager.selectAllExchanges();Assertions.assertEquals(2, exchangeList.size());Exchange newExchange exchangeList.get(1);Assertions.assertEquals(testExchange, newExchange.getName());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());Assertions.assertEquals(false, newExchange.isAutoDelete());Assertions.assertEquals(true, newExchange.isDurable());Assertions.assertEquals(1, newExchange.getArguments(aaa));Assertions.assertEquals(2, newExchange.getArguments(bbb));
}测试删除
Test
public void testDeleteExchange() {// 先构造一个交换机, 插入数据库; 然后再按照名字删除即可!Exchange exchange createTestExchange(testExchange);dataBaseManager.insertExchange(exchange);ListExchange exchangeList dataBaseManager. selectAllExchanges();Assertions.assertEquals(2, exchangeList.size());Assertions.assertEquals(testExchange, exchangeList.get(1).getName());// 进行删除操作dataBaseManager.deleteExchange(testExchange);// 再次查询exchangeList dataBaseManager.selectAllExchanges();Assertions.assertEquals(1, exchangeList.size());Assertions.assertEquals(, exchangeList.get(0).getName());
}6.7.4 测试 队列
同理也添加 新的 getter 和 setter 方法
public Object getArguments(String key) {return arguments.get(key);
}public void setArguments(String key, Object value) {arguments.put(key, value);
}private MSGQueue createTestQueue(String queueName) {MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);queue.setArguments(aaa, 1);queue.setArguments(bbb, 2);return queue;
}Test
public void testInsertQueue() {MSGQueue queue createTestQueue(testQueue);dataBaseManager.insertQueue(queue);ListMSGQueue queueList dataBaseManager.selectAllQueues();// 之前没有数据因此只有一个Assertions.assertEquals(1, queueList.size());MSGQueue newQueue queueList.get(0);Assertions.assertEquals(testQueue, newQueue.getName());Assertions.assertEquals(true, newQueue.isDurable());Assertions.assertEquals(false, newQueue.isAutoDelete());Assertions.assertEquals(false, newQueue.isExclusive());Assertions.assertEquals(1, newQueue.getArguments(aaa));Assertions.assertEquals(2, newQueue.getArguments(bbb));
}Test
public void testDeleteQueue() {MSGQueue queue createTestQueue(testQueue);dataBaseManager.insertQueue(queue);ListMSGQueue queueList dataBaseManager.selectAllQueues();Assertions.assertEquals(1, queueList.size());// 进行删除dataBaseManager.deleteQueue(testQueue);queueList dataBaseManager.selectAllQueues();Assertions.assertEquals(0, queueList.size());
}6.7.5 测试绑定
private Binding createTestBinding(String exchangeName, String queueName) {Binding binding new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(testBindingKey);return binding;}Testpublic void testInsertBinding() {Binding binding createTestBinding(testExchange, testQueue);dataBaseManager.insertBinding(binding);ListBinding bindingList dataBaseManager.selectAllBindings();Assertions.assertEquals(1, bindingList.size());Assertions.assertEquals(testExchange, bindingList.get(0).getExchangeName());Assertions.assertEquals(testQueue, bindingList.get(0).getQueueName());Assertions.assertEquals(testBindingKey, bindingList.get(0).getBindingKey());}Testpublic void testDeleteBinding() {Binding binding createTestBinding(testExchange, testQueue);dataBaseManager.insertBinding(binding);ListBinding bindingList dataBaseManager.selectAllBindings();Assertions.assertEquals(1, bindingList.size());// 删除Binding toDeleteBinding createTestBinding(testExchange, testQueue);dataBaseManager.deleteBinding(toDeleteBinding);bindingList dataBaseManager.selectAllBindings();Assertions.assertEquals(0, bindingList.size());}七、消息存储设计
7.1 设计思路
消息需要在硬盘上存储.但是并不直接放到数据库中,⽽是直接使⽤⽂件存储.
原因如下:
对于消息的操作并不需要复杂的增删改查.对于⽂件的操作效率⽐数据库会⾼很多.主流MQ的实现(包括RabbitMQ)都是把消息存储在文件中而不是数据库中.下面来设计一下消息具体如何在文件中存储~~
我们知道消息是依附于队列的。因此存储的时候就把 消息 按照 队列 维度展开。
之前创建数据库的时候已经有了一个 data目录meta.db 就在这个目录下
在 data 中再创建一些子目录每个队列都对应一个子目录。而子目录的名字就是队列名。
每个队列的子目录下再分配两个文件来存储消息。
整个目录结构如下下面来具体看一下保存消息的这两个文件
queue_data
queue_data 这个文件 是一个二进制格式的文件。
这里做出如下的约定自己约定的
每个文件中包含若干个消息每个消息都以二进制的方式存储每个消息由2个部分构成
消息的长度 和 消息的二进制数据
消息的长度它的大小固定是四个字节它里面存的值就代表第二部分的长度。一个文件中有那么多的消息我们该怎么找到一个消息呢
这里不要忘记了之前在 Message 这个类中我们定义了 两个变量一个是 offsetBeg一个是 offsetEnd。我们用 [offsetBeg,offsetEnd) 来表示一个消息。
我们存储消息的时候Message 对象在内存中存了一份同时在硬盘中也存一份。而内存中存到那一份消息记录了当前的消息的 offsetBeg 和 offsetEnd。通过先找到内存中的消息再根据该消息的两个变量值就能找到硬盘中的消息数据了。另外注意到Message 对象 还有一个属性是 isVaild他是干什么的呢
这个属性是用来标识当前 这个消息在文件中是否有效~~
对于 Broker Server 来说消息是需要新增也需要删除的。新增和删除, 对于内存中来说,好办(直接使用一些集合类)但是在文件上就麻烦了。新增消息可以直接把新的消息追加到文件末尾删除消息不好搞。
如果要想直接删除中间的一个消息就需要把他后面的所有消息都往前移动一个单位类似于 “顺序表” 的删除这样的操作。效率是非常低的。所以这种删除的方式是行不通的~~
因此使用逻辑删除的方式是比较适合的
isVaild 为 1表示该消息有效。
isVaild 为 0表示该消息无效已经被删除了
7.2 垃圾回收
随着时间的推移,这个消息文件可能会越来越大~~ 并且,这里可能大部分都是无效的消息针对这种情况, 就需要考虑对当前消息数据文件,进行垃圾回收~~
此处我们采用的 复制算法针对消息数据文件中的垃圾进行回收~~
具体的操作就是直接遍历原有的消息数据文件把所有的有效数据数据重新拷贝一份到新的文件中新文件名字和原来文件名字相同再把旧的文件直接删除掉。【注意】复制算法比较适用的前提是当前的空间有效数据不多大多数都是无效的数据。垃圾回收 有了那么什么时候进行 垃圾回收呢
这里就要用到刚刚说的 另外一个文件 queu_stat.txt了使用这个文件来保存消息的统计信息。
queue_stat.txt这个文件只存一行数据并且是文本格式这一行里有两列
第一列是 queue_data.txt中总的消息的数目
第二列是queue_data.txt 中有效消息的数目
两者用 \t 分割
比如2000\t1500, 代表该队列总共有 2000 条消息其中有效消息为 1500 条。
那这两个数据有什么用呢
这里我们就约定当消息总数超过 2000 条为了避免 GC 太频繁比如一共 4 个消息其中 2 个消息无效了并且有效消息数目低于总消息数的 50 %就触发一次垃圾回收当消息总数超过 2000 条并且有效消息数目低于总消息数的 50 %
这 两个 数据是我们自己设定的你想设置成多少都可以只要你觉得合理就行了。如果当一个文件消息数目非常的多而且都是有效信息此时会导致整个消息的数据文件非常庞大后续针对这个文件操作就会非常耗时。假设当前文件已经达到 10 个 G 了那么此时如果触发一次 GC整个耗时就会非常高。
对于 RabbitMQ 来说解决方案是把一个大的文件拆成若干个小的文件
文件拆分当某个文件长度达到一定的阈值的时候就会拆分成两个文件拆着拆着就成了很多文件
文件合并每个单独的文件都会进行GC如果GC之后发现文件变小了就会和相邻的其他文件合并
这样做可以保证在消息特别多的时候也能保证性能上的及时响应
但是这块的逻辑比较复杂这里就不实现了只考虑单个文件的情况~~如果要实现这个机制大概的思路:
1.需要专门的数据结构,来存储当前队列中有多少个数据文件每个文件大小是多少,消息数目是多少,无效消息是多少
2.设计策略, 什么时候触发文件的拆分,什么时候触发文件的合并7.3 创建MessageFileManager类
这个类就是专门用来管理上述信息的
首先定义一个内部类来表示消息的统计信息
static public class Stat {// 此处直接定义成 public就不用再搞 getter setter 方法了// 对于这样的简单的类就直接使用成员类似于 C 的结构体了public int totalCount; // 总消息数量public int validCount; // 有效消息数量
}接着根据上面的设计表示出消息文件的目录和文件名
// 约定消息文件所在目录和文件名
// 这个方法用来获取到指定队列对应的消息所在路径
private String getQueueDir(String queueName) {return ./data/ queueName;
}// 这个方法用来获取该队列的消息数据所在路径
// 注意二进制文件使用 txt 作为后缀不太合适。txt 一般表示文本此处咱们也就不改了
// 二进制一般多用 .bin / .dat
private String getQueueDataPath(String queueName) {return getQueueDir(queueName) /queue_data.txt;
}// 这个方法用来获取该队列的消息统计文件路径
private String getQueueStatPath(String queueName) {return getQueueDir(queueName) /queue_stat.txt;
}下面来编写统计文件的读写操作
private Stat readStat(String queueName) {// 由于当前的 消息统计文件 是文本文件可以直接使用 Scanner 来读取文件内容Stat stat new Stat();try (InputStream inputStream new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner new Scanner(inputStream);stat.totalCount scanner.nextInt();stat.validCount scanner.nextInt();} catch (IOException e) {e.printStackTrace();}return stat;
}private void writeStat(String queueName, Stat stat) {// 使用 PrintWriter 来写文件// OutputStream 打开文件默认情况下会直接把原文件清空此时相当于新的数据覆盖了旧的try (OutputStream outputStream new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter new PrintWriter(outputStream);printWriter.write(stat.totalCount \t stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}
}创建队列对应的文件和目录
// 创建队列相关的文件和目录public void createQueueFiles(String queueName) throws IOException {// 1. 创建队列对应的消息目录File baseDir new File(getQueueDir(queueName));if (!baseDir.exists()) {// 如果不存在就创建这个目录boolean ok baseDir.mkdirs();if (!ok) {throw new IOException(创建目录失败baseDir baseDir.getAbsolutePath());}}// 2. 创建队列数据文件File queueDataFile new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {// 也是不存在则创建boolean ok queueDataFile.createNewFile();if (!ok) {throw new IOException(创建文件失败queueDataFile queueDataFile.getAbsolutePath());}}// 3. 创建队列统计文件File queueStatFile new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok queueStatFile.createNewFile();if (!ok) {throw new IOException(创建文件失败queueStatFile queueStatFile.getAbsolutePath());}}// 给统计文件设定初始值Stat stat new Stat();stat.totalCount 0;stat.validCount 0;writeStat(queueName,stat);}删除队列的目录和文件队列也是可以被删除的。当队列删除之后对应的消息文件啥的自然也要被随之删除。// 删除队列的目录和文件// 队列也是可以删除的当队列删除之后对应的消息文件啥的自然也要随之删除public void destroyQueueFiles(String queueName) throws IOException {// 先删除里面的文件,再删除目录File queueDataFile new File(getQueueDataPath(queueName));boolean ok1 queueDataFile.delete();File queueSataFile new File(getQueueStatPath(queueName));boolean ok2 queueSataFile.delete();File baseDir new File(getQueueDir(queueName));boolean ok3 baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一个删除失败都算整体删除失败throw new IOException(删除队列目录和文件失败baseDir baseDir.getAbsolutePath());}}检查队列的目录和文件是否都存在比如后续有生产者给 broker server 生产消息了这个消息就有可能需要记录到文件上(取决于消息是否要持久化)// 检查队列的目录和文件是否都存在// 比如后续有生产者给 broker server 生产消息了这个消息就有可能需要记录到文件上(取决于消息是否要持久化)public boolean checkFilesExists(String queueName) {// 判定队列的数据文件和统计文件是否都存在File queueDataFile new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}7.4 消息序列化
在设计 Message 的时候我们已经提到过序列化这个事情。
序列化就是把一个对象 (结构化的数据) 转化成一个字符串/字节数组。
反序列化把序列化之后的 (字符串/字节数组)在还原成一个对象 (结构化的数据)。序列化之后方便存储和传输。
存储一般就是存储到文件中因为文件只能存字符串/二进制数据不能直接存对象。传输是指通过网络传输。由于 Message里面存储的 body 部分(字节数组)是二进制数据不太方便利用 JSON 序列化。JSON 序列化得到的结果是文本数据.无法存储二进制”因此这里直接使用二进制的序列化方式针对 Message 对象进行序列化
针对二进制序列化也有很多种解决方案~~
Java 标准库提供了序列化方案ObjectInputStream 和 ObjectOutputStreamHessian 也是一个解决方案protobufferthrift方案 2 需要引入第三方库操作很麻烦不如使用 标准库
方案 3 和 方案 4要用额外的文件来描述传输的格式操作比较麻烦但是高效。
此处就使用第一个方案,标准库自带的方案这个方案最大的好处,不必引入额外的依赖~~序列化其他的类也可能用到这里就把序列化相关的代码放到公共类里
package com.example.mq.common;import java.io.*;// 下列的逻辑并不仅仅是 Message其他的 Java 中的对象也是可以通过这种的逻辑进行序列化和反序列化的
// 如果要想让这个对象能够序列化或者反序列化需要让这个类能够实现 Serializable 接口
public class BinaryTool {// 把一个对象序列化成一个数组// 方法设置成 static方便其他类直接调用public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数组因为并不知道消息有多长所以先用变成数组// 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中在统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream()) {try (ObjectOutputStream objectOutputStream new ObjectOutputStream(byteArrayOutputStream)) {// 此处的 writeObject 就会把该对象进行序列化生成的二进制字节数据就会写入到 objectOutputStream 中// 而 objectOutputStream 又关联到了 byteArrayOutputStream最终结果就会写入到 byteArrayOutputStreamobjectOutputStream.writeObject(object);// ObjectOutputStream 最终关联到那个对象序列化之后的内容就会到到那个对象里}// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来转成 byte[]return byteArrayOutputStream.toByteArray();}}// 把一个字符数组反序列化为一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object null;try (ByteArrayInputStream byteArrayInputStream new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream new ObjectInputStream(byteArrayInputStream);){// 此处的 readObject就是从 data 这个 byte[] 中获取数据进行反序列化object objectInputStream.readObject();}}return object;}}
7.5 把消息写入文件中发送消息
上面的操作已经把可以把消息序列化了而下面就是把消息存储到文件中也就是相应的队列对应的文件中。
因此写入消息的时候需要两个参数一个是队列名另外一个就是消息本身。
具体步骤先判断当前写入队列的文件在不在把 Message 对象进行序列化转换成二进制的字节数组获取当前队列消息数据文件的长度用这个长度来计算 offsetBeg 和 offsetEnd
设置该消息 offsetBeg 当前文件长度 4设置该消息 offsetEnd 当前文件长度 4 当前二进制数组长度把新的 message 数据写入到文件的末尾处采用追加方式
先写入 4 个字节的消息长度再写入消息本体更新统计文件并重新写入// 这个方法用来把一个新的消息放到队列对应的文件中// queue 表示要把消息写入的队列。message 则是要写的消息public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1. 检查一下当前要写入的队列对应的文件是否存在if (!checkFilesExists(queue.getName())) {throw new MqException([MessageFileManger] 队列对应的文件不存在queueName queue.getName());}// 2. 把 Message 对象,进行序列化转化成二进制的字节数组byte[] messageBinary BinaryTool.toBytes(message);synchronized (queue) {// 3. 先获取到当前队列数据文件的长度用这个来计算出改 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据写入到队列数据文件的末尾此时 Message 对象的 offsetBeg就是当前文件长度 4// offsetEnd 就是当前数据文件长度 4 message 自身长度File queueDataFile new File(getQueueDataPath(queue.getName()));// 通过这个方法 queueDataFile.length() 就能获取到文件的长度。单位字节message.setOffsetBegin(queueDataFile.length() 4);message.setOffsetEnd(queueDataFile.length() 4 messageBinary.length);// 4. 写入消息到数据文件注意是追加写入到数据文件末尾try (OutputStream outputStream new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {// 接下来要先写当前消息的长度占据四个字节dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat readStat(queue.getName());stat.totalCount 1;stat.validCount 1;writeStat(queue.getName(), stat);}}上面的代码中我们用到了 DataOutputStream 以及它的 writeInt()方法。为什么不直接使用 OutPutStream 的 Write() 方法呢
具体原因如下
OutPutStream 的 Write() 方法虽然 参数也是 一个 int但是他实际上只会读取其中的一个字节。虽然我们也可以使用位运算一个一个字节的去读取如下图但是Java标准库给我们提供了更方便的用法。就是使用 DataOutputStream 的 writeInt()方法writeInt() 就是直接写入一个字节。但是这里会有一个问题就是在多线程的环境下会出问题。
比如
往队列中插入消息的时候
线程A计算好消息的 offsetBegin 和 offsetEnd 之后这时突然线程B往 里面插入了一个消息那么线程A 再去插入消息的时候之前算好的位置就不正确了
更新消息统计数据的时候
validCount 和 totalCount 的时候就会出现问题。
比如 totalCount 的时候线程 A先拿到 totalCount 的值再进行1但是这个过程中线程B也拿到了 totalCount 的值并1此时线程 A 再执行 1。这样导致虽然是两次1.但是从结果上看只加了一次。针对上述问题我们可以通过加锁来解决。这里加锁的对象是同一个队列因为不同队列的文件是没有关联的只要保证 不同时修改同一个队列的数据文件就可以了。 修改把消息写入文件的代码
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1. 检查一下当前要写入的队列的文件是否存在if (!checkFilesExists(queue.getName())) {// 这里如果不存在说明还没创建直接抛出系统异常不太合适// 自己创建一个异常类来处理业务内部的逻辑throw new MqException([MessageFileManger] 队列对应的文件不存在queueName queue.getName());}// 2. 把 Message 对象序列化转化成二进制的字节数组byte[] messageBinary BinaryTool.toBytes(message);synchronized (queue) {// 3. 先获取到当前队列数据文件的长度用这个来计算出改 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据写入到队列数据文件的末尾此时 Message 对象的 offsetBeg就是当前文件长度 4// offsetEnd 就是当前数据文件长度 4 message 自身长度File queueDataFile new File(getQueueDataPath(queue.getName()));// 通过这个方法 queueDataFile.length() 就能获取到文件的长度。单位字节message.setOffsetBegin(queueDataFile.length() 4);message.setOffsetEnd(queueDataFile.length() 4 messageBinary.length);// 4. 写入消息到数据文件注意是追加写入到数据文件末尾try (OutputStream outputStream new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {// 接下来要先写当前消息的长度占据四个字节dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat readStat(queue.getName());stat.totalCount 1;stat.validCount 1;writeStat(queue.getName(), stat);}}7.6 删除消息
这里主要是从文件中删除也就是删除硬盘上的数据。采用的方式是逻辑删除就是把 Messgae 对象的 isValid 属性设置成 0x0(16进制。
这里操纵文件使用的是 RamdomAccessFile 他可以做到随机访问能操纵光标
它的 构造方法
1、RandomAccessFile(File file, String mode)
2、RandomAccessFile(String name, String mode)
两个构造方法的第一个参数不做介绍
**mode **第二个参数是指以什么模式创建读写流此参数有固定的输入值必须为r/“rw”/“rws”/rwd其中一个。
r以只读方式打开指定文件。如果试图对该RandomAccessFile指定的文件执行写入方法则会抛出IOException
rw以读取、写入方式打开指定文件。如果该文件不存在则尝试创建文件
rws以读取、写入方式打开指定文件。相对于rw模式还要求对文件的内容或元数据的每个更新都同步写入到底层存储设备默认情形下(rw模式下),是使用buffer的,只有cache满的或者使用RandomAccessFile.close()关闭流的时候儿才真正的写到文件
rwd与rws类似只是仅对文件的内容同步更新到磁盘而不修改文件的元数据
这里用到了三种方法
read() 读write() 写seek() 移动光标
删除消息的具体做法
先把文件中的这一段数据读出来还原回 Message 对象把 isValid 改成 0把上述数据重新写回到文件
// 这个是删除消息的方法// 这里的删除是逻辑删除也就是把硬盘上存储的这个数据里面的那个 isValid 属性设置成0// 1. 先把文件中的这一段数据读出来还原回 Message 对象// 2. 把 isValid 改成 0// 3. 把上述数据重新写回到文件// 此处这个参数中的 message 对象必须得包含有效的 offsetBeg 和 offsetEnd// RandomAccessFile -- 对消息文件的随机访问public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue) {try (RandomAccessFile randomAccessFile new RandomAccessFile(getQueueDataPath(queue.getName()), rw)) {// 1. 先从文件中读取对应的 Message 数据byte[] bufferSrc new byte[(int) (message.getOffsetEnd() - message.getOffsetBegin())];randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.read(bufferSrc);// 2. 把当前读出来的二进制数据转化成 Message 对象Message diskMessage (Message) BinaryTool.fromBytes(bufferSrc);// 3. 把 isValid 设置成无效diskMessage.setIsValid((byte) 0x0);// 重新写入文件byte[] bufferDest BinaryTool.toBytes(diskMessage);// 虽然上面已经 seek 过了但是上面 seek 完了之后进行了读操作这一读就导致文件光标往后移动// 移动到下一个消息的位置了。因此要想让接下来的写入能够刚好写回到之前的位置就需要重新调整文件光标randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.write(bufferDest);// 通过上述的折腾对于文件来说只有一个字节发生了改变而已~~}// 不要忘记更新统计文件把一个消息设置成无效了那么有效的消息数量就要 -1Stat stat new Stat();if (stat.totalCount 0) {stat.validCount - 1;}writeStat(queue.getName(), stat);}}7.7 加载消息到内存中
在设计的时候我们已经说过了。消息会存储两份一份是在内存上方便快速处理另一份是在硬盘上用于持久化存储。下面这个方法就是把硬盘上的有效数据加载到内存上。
上一个方法中提到了内存上存储消息是以集合的形式这里所谓的集合我们采用链表。链表是为了方便后续的删除消息(头删)。
具体实现步骤首先这里的参数只是一个 queueName而不是一个 MSGQueue因为 这个方法是在程序启动时调用此时不会出现多个线程竞争的情况上面方法的 MSGQueue 参数更多是作为 锁对象方便加锁。这里加载消息采用 while 循环
先读取消息的长度
这里不要忘记 我们的一个消息是由两部构成第一个是 固定大小的4字节代表消息体的长度第二部分就是 消息体然后根据这个长度读取消息的内容接着判断读到的消息大小是否匹配如果不一致说明文件有问题格式错乱了如果没有问题把读到的数据反序列回 Messge 对象判断该消息是否有效若有效继续下一步否则 continue (然消息是无效数据但是 offset 不要忘记更新)若有效设置 Message 对象的 offsetBeg 和 offsetEnd之后更新 文件的 offset最后把消息添加到链表中【注意】
那这里什么时候循环结束呢
这里涉及到了一个新的用法根据异常来判断。
我们在读取文件长度的时候使用的是 DataOutputStream 的 readInt() 方法这个方法不像之前流读到文件末尾返回 -1而是直接抛出 EOFException因此我们可以捕获这个异常代表文件读完了。// 使用这个方法从文件中读取出所有的消息内容加载到内存中(具体来说是放到一个链表里)// 这个方法准备在程序启动的时候进行调用// 这里使用一个 LinkedList主要目的是为了后续进行头删操作// 这个方法的参数只是一个 queueName而不是 MSGQueue 对象因为这个方法不需要加锁只使用 queueName 就够了// 由于该方法是在程序启动时调用此时服务器还不能处理请求呢~~ 不涉及多线程操作文件public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedListMessage messages new LinkedList();try (InputStream inputStream new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream new DataInputStream(inputStream)) {// 这个变量记录当前文件光标long currentOffset 0;// 一个文件中包含了很多消息此处势必要循环读取while (true) {// 1. 读取当前消息的长度这里的 readInt 可能会读到文件的末尾(EOF)// readInt 方法读到文件的末尾会抛出 EOFException 异常。这一点和之前的很多流并不一样int messageSize dataInputStream.readInt();// 2. 按照这个长度读取消息内容byte[] buffer new byte[messageSize];int actualSize dataInputStream.read(buffer);if (messageSize ! actualSize) {// 如果不匹配说明文件有问题格式错乱了throw new MqException([MessageFileManager] 文件格式错误queueName queueName);}// 3. 把这个读到的二进制数据反序列化回 Message 对象Message message (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看这个消息对象是不是无效对象if (message.getIsValid() ! 0x1) {// 无效数据直接跳过// 虽然消息是无效数据但是 offset 不要忘记更新currentOffset (4 messageSize);continue;}// 5. 是有效数据,则需要把这个 Message 对象加入到链表中。加入之前还需要填写 offsetBeg 和 offsetEnd// 进行计算 offset 的时候需要知道当前文件光标的位置的。由于当下使用的 DataInputStream 并不方便// 因此就需要手动计算下文件光标message.setOffsetBegin(currentOffset 4);message.setOffsetEnd(currentOffset 4 messageSize);currentOffset (4 messageSize);messages.add(message);}} catch (EOFException e) {// 这个 catch 并非真是处理 “异常”而是处理 “正常”的业务逻辑。文件读到末尾的时候会被 readInt 抛出该异常// 这个 catch 语句中也不需要做啥特殊的事情目的是为了让循环结束System.out.println([MessageFileManger] 恢复 Message 数据完成);}}return messages;}7.8 实现消息文件垃圾回收
由于当前会不停的往消息文件中写入新消息并且删除消息只是逻辑删除这就可能导致消息文件越来越大并且里面又包含大量的无效消息因此就需要处理掉那些无效的信息。
此处的垃圾回收,使用 复制算法
判定当文件中消息总数超过 2000并且有效消息的数目不足 50%就要触发垃圾回收。就是把文件中所有有效的消息取出来单独的再写入到一个新的文件中然后删除旧文件使用新文件代替。
// 检查当前是否要针对该队列的消息进行GCpublic boolean checkGC(String queueName) {// 判断是否要 GC是根据消息数和有效消息数。这两个值都是在 消息统计文件 中的Stat stat readStat(queueName);if (stat.totalCount 2000 (double) stat.validCount / (double) stat.totalCount 0.5) {return true;}return false;}// 引入一个新的数据文件来存放有效信息private String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) /queue_data_new.txt;}// 通过这个方法真正执行消息数据文件的垃圾回收操作// 使用复制算法来完成// 创建一个新的文件名字就是 queue_data_new.txt// 把之前消息数据文件中的有效信息都读出来写到新的文件中// 删除旧的文件再把新的文件改回 queue_data.txt// 同时要记得更新消息统计文件public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {// 进行 gc 的时候是针对消息数据文件进行大洗牌。在这个过程中其他线程不能针对该队列的消息文件做任何修改synchronized (queue) {// 由于 gc 操作可能比较耗时此处统计一下执行消耗的时间。long gcBeg System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()) {// 正常情况下这个文件不应该存在。如果存在就是意外~~ 说明上次 gc 了一半程序意外崩溃了。throw new MqException([MessageFileManger] gc 时发现该队列的 queue_data_new 已经存在queueName queueDataNewFile.getName());}boolean ok queueDataNewFile.createNewFile();if (!ok) {throw new MqException([MessageFileManger] 创建文件失败queueDataNewFile queueDataNewFile.getAbsolutePath());}// 2. 从旧的文件中读取出所有的有效消息对象了。(这个逻辑直接调用上述方法即可不用重新写了)LinkedListMessage messages loadAllMessageFromQueue(queue.getName());// 3. 把有效消息写入到新的文件中try (OutputStream outputStream new FileOutputStream(queueDataNewFile)) {try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer BinaryTool.toBytes(message);// 先写四个字节的长度dataOutputStream.writeInt(buffer.length);// 再写消息本体dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件并且把新的文件进行重命名File queueDataOldFile new File(getQueueDataPath(queue.getName()));ok queueDataOldFile.delete();if (!ok) {throw new MqException([MessageFileManger] 删除旧的数据文件失败queueDataOldFile queueDataOldFile.getAbsolutePath());}// 把 queue_data_new.txt queue_data.txtok queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException([MessageFileManger] 文件重命名失败queueDataNewFile queueDataNewFile.getAbsolutePath() , queueDataOldFile queueDataOldFile.getAbsolutePath());}// 5. 更新统计文件Stat stat readStat(queue.getName());stat.totalCount messages.size();stat.validCount messages.size();writeStat(queue.getName(),stat);long gcEnd System.currentTimeMillis();System.out.println([MessageFileManger] gc 执行完毕queueName queue.getName() ,time (gcEnd - gcBeg) ms);}}7.9 测试MessageFileManager
7.9.1 准备阶段
MessageFileManager 这个类主要是为了管理消息相关的数据而消息事宜是以队列为维度展开的因此准备工作就是先创建好测试要用的队列在后续每个方法的开始前都会重新创建该方法结束之后就会删除该队列。
package com.example.mq2;import com.example.mq2.mqserver.datacenter.MessageFileManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.boot.test.context.SpringBootTest;import java.io.IOException;SpringBootTest
public class MessageFileMangerTests {private MessageFileManager messageFileManager new MessageFileManager();private static final String queueName1 testQueue1;private static final String queueName2 testQueue2;// 这个方法是每个测试用例执行之前的准备工作BeforeEachpublic void setUp() throws IOException {// 准备阶段创建出两个队列以备后用messageFileManager.createQueueFiles(queueName1);messageFileManager.createQueueFiles(queueName2);}// 这个方法就是每个用例 执行完毕之后的收尾工作AfterEachpublic void tearDown() throws IOException {// 收尾阶段就是把刚才的队列给干掉messageFileManager.destroyQueueFiles(queueName1);messageFileManager.destroyQueueFiles(queueName2);}
}7.9.2 测试创建队列文件
在准备工作阶段 我们已经写好了创建 队列文件 的代码因此这里测试的时候主需要验证就行了
Test
public void testCreateFiles() {// 创建队列文件已经在上面 setUp 阶段执行过了。此处主要是验证看看文件是否存在File queueDataFile1 new File(./data/ queueName1 /queue_data.txt);// 这里不是验证它是否存在而是直接验证它是不是一个文件更具体Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 new File(./data/ queueName1 /queue_Stat.txt);Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 new File(./data/ queueName2 /queue_data.txt);Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 new File(./data/ queueName2 /queue_Stat.txt);Assertions.assertEquals(true, queueStatFile2.isFile());}7.9.3 测试读写统计文件
// 测试读写统计文件Testpublic void testReadWriteStat() {MessageFileManager.Stat stat new MessageFileManager.Stat();stat.totalCount 100;stat.validCount 50;// 因为stat 的 write 和 read都是私有的无法直接调用// 此处需要使用反射的形式来调用 writerStat 和 readStat 了。// Java 原生的反射 API 其实非常难用~~// 此处使用 Spring 帮我们封装好的 反射 的工具类ReflectionTestUtils.invokeMethod(messageFileManager, writeStat, queueName1, stat);// 写入完毕之后在调用一下读取验证读取的结果和写入的数据是一致的MessageFileManager.Stat newStat ReflectionTestUtils.invokeMethod(messageFileManager, readStat, queueName1);Assertions.assertEquals(100, newStat.totalCount);Assertions.assertEquals(50, newStat.validCount);}这里用到了一个新的知识点。MessageFileManager 这个类的 readStat() 和 writeStat() 方法都是私有的我们在类外不能直接访问因此这里考虑反射来获取到对应的方法。Java 原生的反射相关 API 太复杂了这里使用 Spring提供的反射类 ReflectionTestUtils.
具体的方法ReflectionTestUtils.invokeMethod();
egReflectionTestUtils.invokeMethod(messageFileManger,writeStat,queueName1,stat);
egMessageFileManger.Stat stat ReflectionTestUtils.invokeMethod(messageFileManger,readStat,queueName1);参数
第一个调用谁的方法
第二个要调用的方法名
后面则是 该调用的方法要传的参数7.9.4 测试 sendMessage
这个方法也就是把消息放到 队列的文件中。因此测试 sendMessage 之前就需要先把 队列 和 消息准备好
// 测试 sendMessageTestpublic void sendMessage() throws IOException, MqException, ClassNotFoundException {// 1. 构造出消息Message message createTestMessage(testMessage);// 2. 构造出队列// 此处创建的 queue 对象的 name不能随便写只能用 queueName1 和 queueName2.// 需要保证这个队列对象对应的目录和文件啥的都存在才行MSGQueue queue createTestQueue(queueName1);// 调用发送消息的方法messageFileManager.sendMessage(queue, message);// 检查 stat 文件MessageFileManager.Stat stat ReflectionTestUtils.invokeMethod(messageFileManager,readStat, queueName1);Assertions.assertEquals(1,stat.totalCount);Assertions.assertEquals(1,stat.validCount);// 检查 data 文件ListMessage messages messageFileManager.loadAllMessageFromQueue(queue.getName());Assertions.assertEquals(1, messages.size());// 因为只有一个这里直接获取Message curMessage messages.get(0);Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());// 比较两个字节数组的内容是否相同不能使用 assertEquals 了Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());// 这里打印一下 message 的具体信息在 Message 类里 重写 toString 方法System.out.println(message: message);}7.9.5 测试 loadAllMessageFromQueue
虽然在上面的测试中我们在核对发送前后的信息时已经用到了这个方法但是只有一条信息而这里是单独测试这个方法并且是多条信息。
测试过程
向队列中发送一百条消息然后从队列中取出来和之前的一一对应。
Testpublic void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 往队列中插入 100 条消息然后验证看看这 100 条消息从文件中读取之后是否和最初是一致的MSGQueue queue createTestQueue(queueName1);ListMessage expectedMessages new LinkedList();for (int i 0; i 100; i) {Message message createTestMessage(testMessage i);messageFileManger.sendMessage(queue, message);expectedMessages.add(message);}// 读取所有消息LinkedListMessage actualMessages messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectedMessages.size(), actualMessages.size());for (int i 0; i actualMessages.size(); i) {Message expectMessage expectedMessages.get(i);Message actualMessage actualMessages.get(i);System.out.println([ i ] actualMessage actualMessage);Assertions.assertEquals(expectMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}7.9.6 测试 deleteMessage
测试删除消息回顾一下咱们的删除消息这里的删除是逻辑删除也就是把硬盘上存储的这个数据里面的那个 isValid 属性设置成0。
这里测试删除消息就是 创建队列写入 10 个消息删除其中的几个消息再把剩下的全部读出来判断是否符合预期。// 测试删除消息// 创建队列写入 10 个消息删除其中的几个消息再把剩下的全部读出来判断是否符合预期Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {// 创建队列MSGQueue queue createTestQueue(queueName1);// 写入 10 个消息ListMessage expectedMessages new LinkedList();for (int i 0; i 10; i) {Message message createTestMessage(testMessage i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 删除其中的几个消息这里为了方便测试直接删除后三个messageFileManager.deleteMessage(queue, expectedMessages.get(9));messageFileManager.deleteMessage(queue, expectedMessages.get(8));messageFileManager.deleteMessage(queue, expectedMessages.get(7));// 读取剩下的消息对比内容是否正确LinkedListMessage actualMessages messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for (int i 0; i actualMessages.size(); i) {Message expectedMessage expectedMessages.get(i);Message actualMessage actualMessages.get(i);System.out.println([ i ] actualMessage actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}八、统一硬盘操作封装数据库和数据文件
当前我们已经使用了 数据库 管理了交换机绑定队列又使用 数据文件 管理了消息。
下面我们创建一个类通过这个类来统一管理上述操作。这样上层逻辑如果需要操作硬盘统一都通过这个类来使用。上层代码不关心数据是存储在数据库还是文件中
package com.example.mq2.mqserver.datacenter;import com.example.mq2.common.MqException;
import com.example.mq2.mqserver.core.Binding;
import com.example.mq2.mqserver.core.Exchange;
import com.example.mq2.mqserver.core.MSGQueue;
import com.example.mq2.mqserver.core.Message;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;/*** 使用这个类来管理硬盘上的数据* 1. 数据库交换机绑定队列* 2. 数据文件消息* 上层逻辑如果需要操作硬盘统一都通过这个类来使用。(上层代码不关心数据是存储在数据库还是文件中)*/
public class DiskDataCenter {// 用这个实例来管理数据库中的数据private DataBaseManager dataBaseManager new DataBaseManager();// 这个实例用来管理数据文件中的数据private MessageFileManager messageFileManager new MessageFileManager();public void init() {// 针对上述两个实例进行初始化dataBaseManager.init();// 当前 messageFileManger.init 是空的方法只是先列在这里一旦后续需要扩展就在这里进行初始化即可messageFileManager.init();}// 封装交换机操作public void insertExchange(Exchange exchange) {dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName) {dataBaseManager.deleteExchange(exchangeName);}public ListExchange selectAllExchange() {return dataBaseManager.selectAllExchanges();}// 封装队列操作public void insertQueue(MSGQueue queue) throws IOException {dataBaseManager.insertQueue(queue);}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);}public ListMSGQueue selectALLQueues() {return dataBaseManager.selectAllQueues();}// 封装绑定操作public void insertBinding(Binding binding) {dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding) {dataBaseManager.deleteBinding(binding);}public ListBinding selectAllBindings() {return dataBaseManager.selectAllBindings();}// 封装消息操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue, message);}public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue, message);// 因为删除了信息这里还要判断是否要 gcif (messageFileManager.checkGC(queue.getName())) {messageFileManager.gc(queue);}}public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}}
九、内存数据管理
上述提到的 交换机、队列、绑定、消息等都是在硬盘上存储的而我们之前设计的时候已经说了内存和硬盘上都要存储一份内存存储数据为主硬盘存储数据为辅主要是为了持久化重启之后数据不丢失。下面就来实现在内存上存储上述数据。
9.1 设计数据结构
交换机直接使用 HashMapkey 是 namevalue 是 Exchange 对象。【注意】
后面可能会在多线程环境下使用因此考虑使用 ConcurrentHashMap它是线程安全的来代替 HashMap(后面的也都是如此)。因为 name 是唯一的 (之前设定的时候已经规定了名字是唯一标识符)因此 使用 name 作为了 key// key 是 exchangeName,value 是 Exchange 对象
private ConcurrentHashMapString, Exchange exchangeMap new ConcurrentHashMap();队列也是使用 HashMapkey 是 namevalue 是 MSGQueue 对象。(也使用 ConcurrentHashMap 替代后续也都是就不在赘述了)
// key 是 queueNamevalue 是 MSGQueue 对象private ConcurrentHashMapString, MSGQueue queueMap new ConcurrentHashMap();绑定使用嵌套的 HashMapkey 是 exchangeNamevalue 是一个 HashMap第二个 HashMap 的 key 是 queueNamevalue 是 Binding 对象
// 第一个 key 是 exchangeName第二个 key 是 queueNameprivate ConcurrentHashMapString, ConcurrentHashMapString, Binding bindingsMap new ConcurrentHashMap();消息使用 HashMapkey 是 messageIdvalue 是 Message 对象。
// key 是 messageIdvalue 是 Message 对象private ConcurrentHashMapString, Message messageMap new ConcurrentHashMap();表示队列和消息之间的关联使用嵌套的 HashMap。key 是 queueNamevalue 是一个 LinkedListLinkedList 中每个元素是一个 Message 对象。
// key 是 queueNamevalue 是一个 Message 的链表private ConcurrentHashMapString, LinkedListMessage queueMessageMap new ConcurrentHashMap();表示 “未被确认” 的消息 使用嵌套的 HashMapkey 是 queueNamevalue 是 HashMap第二个 HashMap 的 key 是 messageIdvalue 是 Message 对象。
// 第一个 key 是 queueName第二个是 messageIdprivate ConcurrentHashMapString, ConcurrentHashMapString, Message queueMessageWaitAckMap new ConcurrentHashMap(); 后续实现消息确认的逻辑,需要根据 ack 响应的内容,这里会提供一个确认的messageld.根据这个 messageld 来把上述结构中的Message 对象找到并移除咱们此处实现的 MQ, 支持两种应答模式(ACK)
自动应答消费者取了元素这个消息就算是被应答了此时这个消息就可以被干掉了。
手动应答消费者取了元素这个消息还不算被应答需要消费者主动再调用一个 basicAck 方法此时才认为是真正应答了才能删除这个消息。9.2 管理交换机
添加交换机public void insertExchange(Exchange exchange) {exchangeMap.put(exchange.getName(), exchange);System.out.println([MemoryDataCenter] 新交换机添加成功exchangeName exchange.getName());}获取交换机public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}
删除交换机public void deleteExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println([MemoryDataCenter] 交换机删除成功exchangeName exchangeName);}9.3 管理队列
添加队列public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(), queue);System.out.println([MemoryDataCenter] 新队列添加成功queueName queue.getName());}获取队列public MSGQueue getQueue(String queueName) {return queueMap.get(queueName);}删除队列
public void deleteQueue(String queueName) {queueMap.remove(queueName);System.out.println([MemoryDataCenter] 队列删除成功queueName queueName);}9.4 管理绑定
添加绑定
因为绑定表是一个嵌套的 Map 表因此现根据第一个 key(exchangeName)查找对应的 value(也是一个 Map 表key 是 queueNamevalue 是 binding)如果不存在则创建然后根据 第二个keyqueueName查找是否存在相应的 binding如果存在就报错(已经存在了就不能在添加同样的了)public void insertBinding(Binding binding) throws MqException {// 先使用 exchangeName 查一下对应的哈希表是否存在不存在就创建一个
// ConcurrentHashMapString, Binding bindingMap bindingsMap.get(binding.getExchangeName());
// if (bindingMap null) {
// bindingMap new ConcurrentHashMap();
// bindingsMap.put(binding.getExchangeName(),bindingMap);
// }// 上面这段代码可以使用下面这个代替,逻辑是一样的ConcurrentHashMapString, Binding bindingMap bindingsMap.computeIfAbsent(binding.getExchangeName(),k - new ConcurrentHashMap());synchronized (bindingMap) {// 再根据 queueName 查一下。如果已经存在就抛出异常不存在才能插入if (bindingMap.get(binding.getQueueName()) ! null) {throw new MqException([MemoryDataCenter] 绑定已经存在exchangeName binding.getExchangeName());}bindingMap.put(binding.getQueueName(), binding);}System.out.println([MemoryDataCenter] 新绑定添加成功exchangeName binding.getExchangeName() queueName binding.getQueueName());}获取绑定
获取绑定写两个版本
① 根据 exchangeName 和 queueName 确定唯一一个 Binding
② 根据 exchangeName 获取所有的 Binding
// 获取绑定写两个版本// 1. 根据 exchangeName 和 queueName 确定唯一一个 Binding// 2. 根据 exchangeName 获取所有的 Bindingpublic Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMapString, Binding bindingMap bindingsMap.get(exchangeName);if (bindingMap null) {return null;}return bindingMap.get(queueName);}public ConcurrentHashMapString, Binding getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}删除绑定
public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMapString, Binding bindingMap bindingsMap.get(binding.getExchangeName());if (bindingMap null) {throw new MqException([MemoryDataCenter] 绑定不存在exchangeName binding.getExchangeName() queueName binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println([MemoryDataCenter] 绑定删除成功exchangeName binding.getExchangeName() queueName binding.getQueueName());}9.5 管理消息
添加消息
public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println([MemoryDataCenter] 新消息添加成功messageId message.getMessageId());}根据 id 查询信息public Message getMessage(String messageId) {return messageMap.get(messageId);}根据 id 删除信息
public void removeMessage(String messageId) {messageMap.remove(messageId);System.out.println([MemoryDataCenter] 消息被移除messageId messageId);}发送消息到指定队列
// 发送消息到指定队列public void sendMessage(MSGQueue queue, Message message) {// 把消息放到对应的队列数据结构中// 先根据队列的名字找到该队列对应的消息链表
// LinkedListMessage messages queueMessageMap.get(queue.getName());
// if (messages null) {
// messages new LinkedList();
// queueMessageMap.put(queue.getName(),messages);
// }// computeIfAbsent 是线程安全的LinkedListMessage messages queueMessageMap.computeIfAbsent(queue.getName(), k - new LinkedList());// 再把数据加到 messages 里面synchronized (messages) {messages.add(message);}// 在这里把该消息也往消息中心插入一下即使 message 已经在 消息中心存在了重复插入也没有关系// 主要是相同的 messageId对应的 message 的内容一定是一样的。(服务器代码不会对 Message 内容(BasicProperties 和 body)做修改)addMessage(message);System.out.println([MemoryDataCenter] 消息被投递到队列中messageId message.getMessageId());}从队列中取消息
// 从队列中取消息
public Message pollMessage(String queueName) {// 根据队列名查找一下对应的队列的消息链表LinkedListMessage messages queueMessageMap.get(queueName);// 如果没找到说明队列中没有任何消息if (messages null) {return null;}synchronized (messages) {// 消息数为 0这里也返回 nullif (messages.size() 0) {return null;}// 链表中有元素就进行头删Message currentMessage messages.remove(0);System.out.println([MemoryDataCenter] 消息从队列中取出messageId currentMessage.getMessageId());return currentMessage;}}获取指定队列中消息的个数
// 获取指定队列中消息的个数
public int getMessageCount(String queueName) {LinkedListMessage messages queueMessageMap.get(queueName);if (messages null) {// 队列中没有消息return 0;}synchronized (messages) {return messages.size();}}9.6 管理待确认的消息
添加未确认的消息
// 添加未确认的消息public void addMessageWaitAck(String queueName, Message message) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAckMap.computeIfAbsent(queueName,k - new ConcurrentHashMap());messageHashMap.put(message.getMessageId(), message);System.out.println([MemoryDataCenter] 消息进入待确认队列messageId message.getMessageId());}删除未确认的消息(消息已经确认了)// 删除未确认的消息(消息已经确认了)public void removeMessageWaitAck(String queueName, String messageId) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAckMap.get(queueName);if (messageHashMap null) {return;}messageHashMap.remove(messageId);System.out.println([MemoryDataCenter] 消息从待确认队列删除messageId messageId);}获取指定的未确认的消息
// 获取指定的未确认的消息public Message getMessageWaitAck(String queueName, String messageId) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAckMap.get(queueName);if (messageHashMap null) {return null;}return messageHashMap.get(messageId);}9.7 从硬盘中恢复数据到内存
如果遇到一些突发情况比如关机服务器挂了等可能会导致内存中的数据丢失这时候就需要从硬盘上把这些数据恢复到内存中。这也是把数据在硬盘上也存储一份的作用。
具体步骤
清空之前内存集合中的数据直接清空防止残留数据 对 新数据产生干扰恢复所有的交换机数据恢复所有的队列数据恢复所有的绑定数据恢复所有的消息数据
注意不需要恢复 等待确认 的消息。因为在 当消息在等待 ACK 的时候服务器重启了此时消息还没有被处理就相当于还没有被取走等到消费者用的时候从 messageMap 里重新取一次就行了不需要单独存起来。
// 这个方法就是从硬盘上读取数据把硬盘中之前持久化存储的各个维度的数据都恢复到内存中
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {// 0. 清空之前的所有数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();// 1. 恢复交换机的所有数据ListExchange exchanges diskDataCenter.selectAllExchange();for (Exchange exchange : exchanges) {exchangeMap.put(exchange.getName(), exchange);}// 2. 恢复所有的队列数据ListMSGQueue queues diskDataCenter.selectALLQueues();for (MSGQueue queue : queues) {queueMap.put(queue.getName(), queue);}// 3. 恢复所有的绑定数据ListBinding bindings diskDataCenter.selectAllBindings();for (Binding binding : bindings) {ConcurrentHashMapString, Binding bindingMap bindingsMap.computeIfAbsent(binding.getExchangeName(),k - new ConcurrentHashMap());bindingMap.put(binding.getQueueName(), binding);}// 4. 恢复所有的消息数据// 遍历所有的队列根据每个队列的名字来获取所有的消息for (MSGQueue queue : queues) {LinkedListMessage messages diskDataCenter.loadAllMessageFromQueue(queue.getName());queueMessageMap.put(queue.getName(), messages);for (Message message : messages) {messageMap.put(message.getMessageId(), message);}}// 注意针对 “未确认的消息” 这部分内存中的数据不需要从硬盘恢复之前考虑存储的时候也没设定这一块// 一旦在等待 ack 的过程中服务器重启了此时这些 “未被确认的消息”就恢复成 “未被取走的消息”。// 这个消息在硬盘上存储的时候就是当做 “未被取走”
}9.8 测试 MemoryDataCenter
9.8.1 准备阶段
先做好 初始化 和 收尾工作。
SpringBootTest
public class MemoryDataCenterTests {private MemoryDataCenter memoryDataCenter null;BeforeEachpublic void setUp() {memoryDataCenter new MemoryDataCenter();}AfterEachpublic void tearDown() {// 因为数据是保存在内存上为了避免影响每次都要清空memoryDataCenter null;}
}这里每个单元测试前都要创建一个新的实例并且该方法结束后也要将该实例置为空这是因为 MemoryDataCenter 的数据都是保存在内存上因此需要这些操作来保证下一次运行数据都是新的不会受上次干扰。与 MessageFileManager 不同 MessageFileManager 的数据都是保存在文件上下图是 MessageFileManager 的准备、收尾 工作。因此每次只需要把文件上的内容销毁即可不需要每次都创建新的实例。准备好测试用的交换机和队列
// 创建一个测试交换机private Exchange createTestExchange(String exchangeName) {Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);return exchange;}// 创建一个测试队列private MSGQueue createTestQueue(String queueName) {MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setExclusive(false);queue.setAutoDelete(false);return queue;}9.8.2 测试交换机
这里使用一个测试方法直接测试交换机的 创建、查找以及删除。
// 针对交换机进行测试Testpublic void testExchange() {// 1. 先构造出一个交换机并插入Exchange expectExchange createTestExchange(testExchange);memoryDataCenter.insertExchange(expectExchange);// 2. 查询出这个交换机比较结果是否一致。此处直接比较这俩引用指向同一个对象// 因为是在内存上存储的保存在内存上的 Map 表中实体只有一个两个引用都是指向它Exchange actualExchange memoryDataCenter.getExchange(testExchange);Assertions.assertEquals(expectExchange, actualExchange);// 3. 删除这个交换机memoryDataCenter.deleteExchange(testExchange);// 4. 再查一次看是否就查不到了actualExchange memoryDataCenter.getExchange(testExchange);Assertions.assertNull(actualExchange);}9.8.3 测试队列
// 针对队列进行测试Testpublic void testQueue() {// 1. 构造一个队列并插入MSGQueue exceptQueue createTestQueue(testQueue);memoryDataCenter.insertQueue(exceptQueue);// 2. 查询这个队列并比较MSGQueue actualQueue memoryDataCenter.getQueue(testQueue);Assertions.assertEquals(exceptQueue, actualQueue);// 3. 删除这个交换机memoryDataCenter.deleteQueue(testQueue);// 4. 再次查询队列看是否能查到actualQueue memoryDataCenter.getQueue(testQueue);Assertions.assertNull(actualQueue);}9.8.4 测试绑定
// 针对绑定进行测试Testpublic void testBinding() throws MqException {// 1. 创建绑定Binding expectBinding new Binding();expectBinding.setExchangeName(testExchange);expectBinding.setQueueName(testQueue);expectBinding.setBindingKey(testBindingKey);memoryDataCenter.insertBinding(expectBinding);// 2. 查询 (我们有两个获取绑定的方法因此查询两次)// 根据交换机和队列名。查询指定的绑定Binding actualBinding memoryDataCenter.getBinding(testExchange, testQueue);Assertions.assertEquals(expectBinding, actualBinding);// 根据交换机查询所有的绑定ConcurrentHashMapString, Binding bindingMap memoryDataCenter.getBindings(testExchange);Assertions.assertEquals(1, bindingMap.size());Assertions.assertEquals(expectBinding, bindingMap.get(testQueue));// 3. 删除memoryDataCenter.deleteBinding(expectBinding);// 4. 查询看是否存在actualBinding memoryDataCenter.getBinding(testExchange, TestQueue);Assertions.assertNull(actualBinding);}9.8.5 测试消息
测试消息本身的 添加、获取、删除
private Message createTestMessage(String content) {Message message Message.createMessageWithId(testRoutingKey, null, content.getBytes());return message;
}Test
public void testMessage() {Message expectMessage createTestMessage(toMessage);memoryDataCenter.addMessage(expectMessage);Message actualMessage memoryDataCenter.getMessage(expectMessage.getMessageId());Assertions.assertEquals(expectMessage, actualMessage);memoryDataCenter.removeMessage(expectMessage.getMessageId());actualMessage memoryDataCenter.getMessage(expectMessage.getMessageId());Assertions.assertNull(actualMessage);
}测试发送消息
创建一个队列创建 10 条消息把这些消息都插入队列中从队列中取出这些消息比较取出的这些消息和之前的消息是否一致
Test
public void testSendMessage() {// 创建一个队列创建 10 条消息把这些消息都插入队列中MSGQueue queue createTestQueue(testQueue);ListMessage expectMessages new ArrayList();for (int i 0; i 10; i) {Message message createTestMessage(testMessage i);memoryDataCenter.sendMessage(queue, message);expectMessages.add(message);}// 2. 从队列中取出这些消息ListMessage actualMessage new ArrayList();while (true) {Message message memoryDataCenter.pollMessage(testQueue);if (message null) {break;}actualMessage.add(message);}// 3. 比较取出的这些消息和之前的消息是否一致Assertions.assertEquals(expectMessages.size(), actualMessage.size());for (int i 0; i actualMessage.size(); i) {Assertions.assertEquals(expectMessages.get(i), actualMessage.get(i));}
}测试消息应答
Testpublic void testMessageWaitAck() {Message expectMessage createTestMessage(expectMessage);memoryDataCenter.addMessageWaitAck(testQueue, expectMessage);Message actualMessage memoryDataCenter.getMessageWaitAck(testQueue, expectMessage.getMessageId());Assertions.assertEquals(expectMessage, actualMessage);memoryDataCenter.removeMessageWaitAck(testQueue, expectMessage.getMessageId());actualMessage memoryDataCenter.getMessageWaitAck(testQueue, expectMessage.getMessageId());Assertions.assertNull(actualMessage);}测试恢复信息Recovery
Test
public void testRecovery() throws IOException, MqException, ClassNotFoundException {// 由于后续需要进行数据库操作依赖 Mybatis就需要先启动 SpringApplication这样才能进行后续的操作MqApplication.context SpringApplication.run(MqApplication.class);// 1. 在硬盘上构造数据DiskDataCenter diskDataCenter new DiskDataCenter();diskDataCenter.init();// 构造交换机Exchange expectedExchange createTestExchange(testExchange);diskDataCenter.insertExchange(expectedExchange);// 构造队列MSGQueue expectedQueue createTestQueue(testQueue);diskDataCenter.insertQueue(expectedQueue);// 构造绑定Binding expectedBinding new Binding();expectedBinding.setExchangeName(testExchange);expectedBinding.setQueueName(testQueue);expectedBinding.setBindingKey(testBindingKey);diskDataCenter.insertBinding(expectedBinding);// 构造消息Message expectedMessage createTestMessage(testMessage);diskDataCenter.sendMessage(expectedQueue, expectedMessage);// 2. 执行恢复操作memoryDataCenter.recovery(diskDataCenter);// 3. 对比结果// 这里就不能直接对比引用了因为他是从硬盘中反序列化之后得到的这个过程创建了新的对象Exchange actualExchange memoryDataCenter.getExchange(testExchange);Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());Assertions.assertEquals(expectedExchange.isAutoDelete(), actualExchange.isAutoDelete());MSGQueue actualQueue memoryDataCenter.getQueue(testQueue);Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());Assertions.assertEquals(expectedQueue.isAutoDelete(), actualQueue.isAutoDelete());Assertions.assertEquals(expectedQueue.isExclusive(), actualQueue.isExclusive());Binding actualBinding memoryDataCenter.getBinding(testExchange, testQueue);Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());Assertions.assertEquals(expectedBinding.getQueueName(), actualBinding.getQueueName());Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());Message actualMessage memoryDataCenter.pollMessage(testQueue);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());// 4. 清理硬盘的数据把整个 data 目录里的内容都删掉(包含了 meta.db 和 队列的目录)MqApplication.context.close(); // 先关闭连接才能正常删除掉(释放使用)File dataDir new File(./data);FileUtils.deleteDirectory(dataDir); // 可以递归删除目录}运行之后发现报错了这个错误说我们的队列文件不存在也就是 queue_data.txt 和 queue_stat.txt 这两个文件不存在这里也体现了我们打印错误信息的好处不然有时候定位到代码我们也不能快速反应什么问题不如文字直入心灵。因此我们就根据这个点来排查问题。
既然不存在说明创建文件的代码 (创建文件的代码在 MessageFileManger 中) 没有正确运行所以我们先找到这个方法发现这个方法只在测试代码中调用了原因就是我们封装的时候上层代码没有调用这个方法因此找到他的上层先关代码我们统一硬盘操作的时候把 MessageFileManger 这个类封装在了 DiskDataCenter 这个类里所以应该由这个类调用 MessageFileManger 中创建数据文件的代码我们找到相应的代码这是与队列相关的操作可以发现我们只是创建了队列并没有创建队列文件因此添加上即可。同时销毁队列时也要删除队列文件。修改如下
在 DiskDataCenter 中添加相应的代码如图添加上述两行代码即可。
9.9 总结
上述过程主要是在内存中而这就需要我们设计合理的数据结构来存储数据。
我们广泛使用了 哈希表、链表、嵌套的数据结构 等来保存和管理 交换机、队列、绑定、消息上述都是基本的数据结构因此我们只有深刻理解这些数据结构才能更好地去使用。
其次我们要考虑线程安全的问题
要不要加锁锁加到哪里使用哪个对象作为锁对象
这个没有标准的答案要根据具体的情况来分析。
总的原则 分析如果不加锁这个代码会造成啥样的后果/问题这个后果你觉得是否严重?
十、 虚拟主机VirtualHost设计
前面我们已经描述了虚拟主机是什么此处为了简单只实现单个虚拟主机并不打算实现 添加/删除 虚拟主机。但是仍会在设计数据结构时留下这样一个扩展空间。虚拟主机不仅仅要管理数据还要提供一些核心 API供上层代码调用。10.1 创建 VirtualHost 类
这里做好准备工作把 MemoryDataCenter 以及 DiskDataCenter 引入过来并在构造方法中进行初始化构造初始数据。
/*** 通过这个类来表示 虚拟主机* 每个虚拟主机下面都管理着自己的 交换机队列绑定消息数据* 同时提供 api 供上层调用* 针对 VirtualHost 这个类作为业务逻辑的整合者就需要对于代码中抛出的异常进行处理了*/
public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter new MemoryDataCenter();private DiskDataCenter diskDataCenter new DiskDataCenter();// 构造方法public VirtualHost(String name) {this.virtualHostName name;// 对于 MemoryDataCenter 来说不需要额外的初始化操作的// 但是针对 DiskDataCenter 来说则需要进行初始化操作建库建表和初始数据的设定diskDataCenter.init();// 另外还需要针对硬盘的数据进行恢复到内存中try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();System.out.println([VirtualHost] 恢复内存数据失败);}}public String getVirtualHostName() {return virtualHostName;}public MemoryDataCenter getMemoryDataCenter() {return memoryDataCenter;}public DiskDataCenter getDiskDataCenter() {return diskDataCenter;}
}10.2 创建交换机exchangeDelcare
一个虚拟主机上肯能会存在多个交换机那么我们应该如何表示交换机和虚拟主机之间的从属关系呢
方案一参考数据库设计“一对多”方案比如给交换机表添加个属性用来表示虚拟主机的 id/name…方案二在 VirtualHost 这个类中重新约定交换机的名字。让 交换机的名字 虚拟主机的名字 交换机真实的名字。方案三更优雅的办法是给每个虚拟主机分配一组不同的数据库和文件… (比方案二更麻烦)虚拟主机存在的目的就是为了保证隔离让不同虚拟主机之间的内容不要有影响~~
比如
在虚拟主机1 中 搞了个 exchange叫做 testExchange
在虚拟主机2 中也搞一个 exchange也叫做 testExchange
上述这种情况是可以的虚机主机就是干这个的。此时我们就可以受用方法二加上前缀
virtualHost1testExchangevirtualHost2testExchange。按照上述这种方式也可以去区分不同的队列。进一步的由于绑定是和交换机和队列都相关此时绑定也就被隔离开了。
再进一步消息和队列是强相关的队列名区分开了消息自然也就区分开了~~这里我们就采用 方案二RabbitMQ也是这样实现的来编写我们的代码。
编写交换机的步骤
把交换机的名字机上虚拟主机作为前缀判断该交换机是否已经存在。直接通过内存查询存在直接返回。若不存在则创建并把交换机对象写入硬盘如果持久化的话随后再写入内存上述逻辑先写硬盘后写内存。目的就是因为硬盘更容易写失败如果硬盘写失败了内存就不写了。要是先写内存。内存写成功了硬盘写失败了还需要把内存的数据给再删掉就比较麻烦// 创建交换机
// 如果交换机不存在就创建如果存在直接返回
// 返回值是 boolean。 创建成功返回 true。失败返回 false
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,MapString, Object arguments) {// 把交换机的名字加上虚拟主机名作为前缀exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {// 1. 判断该交换机是否已经存在这里直接在内存上判断因为硬盘上只是为了持久化存储是为了 recovery// 而内存上是所有的不管有没有持久化存储内存上都有Exchange existsExchange memoryDataCenter.getExchange(exchangeName);if (existsExchange ! null) {// 该交换机已经存在System.out.println([VirtualHost] 交换机已经存在exchangeName exchangeName);return true;}// 2. 不存在真正创建交换机。先构造 Exchange 对象Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3. 把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}// 4. 把交换机写入内存memoryDataCenter.insertExchange(exchange);System.out.println([VirtualHost] 交换机创建完成exchangeName exchangeName);// 上述逻辑先写硬盘后写内存。目的就是因为硬盘更容易写失败如果硬盘写失败了内存就不写了// 要是先写内存。内存写成功了硬盘写失败了还需要把内存的数据给再删掉就比较麻烦}return true;} catch (Exception e) {System.out.println([VirtualHost] 交换机创建失败exchangeName exchangeName);e.printStackTrace();return false;}
}【注意1】
这里因为我们方法有个参数是 MapString, Object arguments但是在对应的 Exchange 类中却没有对应的 getter 方法因此我们还需要再补充一个 getter 方法。
在 Exchange 类中补充 getter 方法public class Exchange {......public void setArguments(MapString, Object arguments) {this.arguments arguments;}
}【注意2】
这里考虑到多线程的环境下若多个线程同时创建交换机如果要创建的交换机的名字相同但是类型不同那么最后创建的是哪个交换机呢为了避免这种情况我们就要考虑加锁。
添加一个锁对象作为 VirtualHost 的成员,这样后续与交换机相关的锁都可以用这个对象。private final Object exchangeLocker new Object();10.3 删除交换机exchangeDelete
根据交换机的名字找到对应的交换机删除硬盘上的数据删除内存上的数据
// 删除交换机
public boolean exchangeDelete(String exchangeName) {exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {// 1. 先找到对应的交换机Exchange toDelete memoryDataCenter.getExchange(exchangeName);if (toDelete null) {throw new MqException([VirtualHost] 交换机不存在无法删除);}// 2. 删除硬盘上的数据if (toDelete.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}// 3. 删除内存中的数据memoryDataCenter.deleteExchange(exchangeName);System.out.println([VirtualHost] 交换机删除成功exchangeName exchangeName);}return true;} catch (Exception e) {System.out.println([VirtualHost] 交换机删除失败exchangeName exchangeName);e.printStackTrace();return false;}
}10.4 创建队列queueDelcare
过程与创建交换机类似
拼接名字判断是否存在存在直接返回不存在则创建并写入到硬盘如果持久化最后写入内存
// 创建队列
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,MapString, Object arguments) {// 把队列的名字给拼接上虚拟主机的名字queueName virtualHostName queueName;try {synchronized (queueLocker) {// 1. 判断队列是否存在MSGQueue existsQueue memoryDataCenter.getQueue(queueName);if (existsQueue ! null) {System.out.println([VirtualHost] 队列已经存在queueName queueName);return true;}// 2. 创建队列对象MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3. 写硬盘if (durable) {diskDataCenter.insertQueue(queue);}// 4. 写内存memoryDataCenter.insertQueue(queue);System.out.println([VirtualHost] 队列创建成功queueName queueName);}return true;} catch (Exception e) {System.out.println([VirtualHost] 队列创建失败queueName queueName);e.printStackTrace();return false;}
}【注意】
对于操作队列和交换机一样也是需要加锁的并且操作交换机和操作队列是不同的因此不能用同一把锁。需要额外创建一个锁同交换机下面这个变量也是 VirtualHost 的成员专门用来给队列加锁。private final Object queueLocker new Object();10.5 删除队列queueDelete
根据交换机的名字找到对应的交换机删除硬盘数据删除内存中数据
// 删除队列
public boolean queueDelete(String queueName) {queueName virtualHostName queueName;try {synchronized (queueLocker) {// 1. 根据队列名字查询下当前的队列对象MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 队列不存在无法删除queueName queueName);}// 2. 删除硬盘数据diskDataCenter.deleteQueue(queueName);// 3. 删除内存数据memoryDataCenter.deleteQueue(queueName);System.out.println([VirtualHost] 删除队列成功queueName queueName);}return true;} catch (Exception e) {System.out.println([VirtualHost] 删除队列失败queueName queueName);e.printStackTrace();return false;}
}10.6 创建绑定queueBind
bindingKey 是进⾏ topic 转发时的⼀个关键概念使⽤ router 类来检测是否是合法的 bindingKey。后续再介绍 router.checkBindingKey 的实现此处先留空。上述中涉及到的 router相关的操作我们后续会进行实现。router 是Router 类的一个实例 这里我们用这个类来实现交换机的转发规则同时也借助这个类验证 bindingKey/routingKey 是否合法等操作。
创建绑定的步骤
拼接交换机和队列的名字判断当前的绑定是否已经存在验证 bindingKey 是否合法创建binding对象获取一下对应的交换机和队列。如果交换机或者队列不存在这样的绑定也是无法创建的。创建成功后先写硬盘后写内存
// 交换机队列绑定
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {queueName virtualHostName queueName;exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1. 判定当前的绑定是否已经存在了Binding existsBinding memoryDataCenter.getBinding(exchangeName, queueName);if (existsBinding ! null) {throw new MqException([VirtualHost] binding 已经存在queueName queueName ,exchangeName exchangeName);}// 2. 验证 bindingKey 是否合法if (!router.checkBindingKey(bindingKey)) {throw new MqException([VirtualHost] bindingKey 非法bindingKey bindingKey);}// TODO 3,4 这里的逻辑应该能变一下先判断在创建// 3. 创建 Binding 对象Binding binding new Binding();binding.setQueueName(queueName);binding.setExchangeName(exchangeName);binding.setBindingKey(bindingKey);// 4. 获取一下对应的交换机和队列。如果交换机或者队列不存在这样的绑定也是无法创建的。MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 队列不存在queueName queueName);}Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException([VirtualHost] 交换机不存在exchangeName exchangeName);}// 5. 先写硬盘if (queue.isDurable() exchange.isDurable()) {diskDataCenter.insertBinding(binding);}// 6. 写入内存memoryDataCenter.insertBinding(binding);System.out.println([VirtualHost] 绑定创建成功exchangeName exchangeName ,queueName queueName);}}return true;} catch (Exception e) {System.out.println([VirtualHost] 绑定创建失败exchangeName exchangeName ,queueName queueName);e.printStackTrace();return false;}
}【注意】
对于绑定和 交换机 以及 队列 都相关因此加锁的时候就要加两把锁一把是对交换机的另一把是对 队列的缺一不可。并且再删除绑定的时候也是需要加两把锁的注意这两个方法的加锁顺序要相同不然一个先对交换机加锁另一个先对队列加锁容易造成死锁的问题。10.7 解除绑定queueUnBind
解除绑定刚开始的思路设计如下也就是先验证绑定是否存在然后再看该绑定对应的交换机和队列是否存在如果有不成立的那么该绑定就不能删除。
判断交换机和队列是否存在是因为我们的 删除交换机和队列的操作是直接删除的没有考虑和绑定之间的关系这样就会造成 如果我先删除了队列那么绑定就删不掉了。
针对绑定删除时涉及到的这个问题,可选的解决方案,主要有两种:
第一种方式参考类似于 mysq! 的外键一样.。删除队列/交换机的时候判定一下看当前队列/交换机是否存在对应的绑定。如果存在则禁止删除队列/交换机要求先解除绑定再尝试删除队列/交换机上面这种方法有点是更严谨但是太麻烦尤其是我们删除一个队列的时候由于我们的 绑定表是嵌套的HashMap 表需要一次遍历内外两层HashMap 去寻找 该队列对应的绑定太慢而且太复杂这里学习阶段就不使用这种方式了。第二种方式就是删除绑定的时候就直接删除不去验证 交换机/队列是否存在。 这种方式的有点就是简单缺点就是不怎么严谨。但在我们学习阶段基本不会这种出现问题
因此这里就直接删除 绑定。
// 交换机队列解除绑定
public boolean queueUnBind(String queueName, String exchangeName) {queueName virtualHostName queueName;exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1. 获取 binding 看是否已经存在~Binding binding memoryDataCenter.getBinding(exchangeName, queueName);if (binding null) {throw new MqException([VirtualHost] 删除绑定失败绑定不存在exchangeName exchangeName queueName queueName);}// 无论绑定是否持久化了都尝试从硬盘上删一下就算不存在这个删除也无副作用diskDataCenter.deleteBinding(binding);// 4. 删除内存上的数据memoryDataCenter.deleteBinding(binding);System.out.println([VirtualHost] 绑定删除成功);}}return true;} catch (Exception e) {System.out.println([VirtualHost] 删除绑定失败);e.printStackTrace();return false;}
}10.8 关于上述核心API线程安全的总结
一
上述的 API都加了很多锁并且锁对象还是 VirtualHost 这个类的成员变量这个锁的粒度是很大的。这样加锁可能会出现效率问题比如我们针对 A 交换机进行操作此时就会影响到 B 交换机的操作。正常情况下这两个交换机是不能相互影响的。
针对上述情况确实可以做出调整使用更加细粒的锁但是影响不大。
对于 Broker Server, 创建交换机, 创建绑定,创建队列,删除交换机, 删除绑定,删除队列… 都属于 低频操作 !!! 既然是低频操作所以遇到两个线程都去操作创建队列之类的情况本身就概率很低了。
因此绝大多数不会触发锁冲突并且 synchronized 首先是偏向锁状态这个状态下加锁成本也还好其实只有遇到竞争才真加锁因此这里就不调整更加细粒的锁了。
二
既然在这一层代码加锁了那么里面的 MemoryDatacenter 中的操作是否就不必加锁了。那么 MemoryDatacenter 是否之前的加锁就都没意文了吗
其实不是的咱们也不知道 MemoryDatacenter 这个类的方法会给哪个类调用的。当前 VirtualHost 自身是保证了线程安全的此时在 VirtualHos 内部调用MemoryDataCenter, MemoryDataCenter里面不加锁问题不大。但是如果是另一个别的类也多线程调用 MemoryDataCenter这种情况就不好说了因此 “先保证自身是安全的”。
10.9 发送消息到指定的交换机/队列中basicPublish
同上代码中牵扯到的 router.checkRoutingKey(routingKey)也是在后面实现。
// 发送消息到指定的交换机/队列中
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字exchangeName virtualHostName exchangeName;// 2. 检查 routingKey 是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException([VirtualHost] routingKey 非法routingKey routingKey);}// 3. 查找交换机对象Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException([VirtualHost] 交换机不存在exchangeName exchangeName);}// 4. 判断交换机类型if (exchange.getType() ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 这里的规则就是让 routingKey 的值就等于没有拼接前队列的名字// 这样拼接后队列的名字还是相当于还是 virtualHostName queueName就是相当于用 routingKey 指定要转发的队列名字String queueName virtualHostName routingKey;// 5. 构造消息对象Message message Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名对应的对象MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 队列不存在queueName queueName);}// 7. 队列存在直接给队列中写入消息sendMessage(queue, message);} else {// 按照 fanout 和 topic 的方式来转发// 5. 找到该交换机关联的所有绑定并遍历这些绑定对象ConcurrentHashMapString, Binding bindingMap memoryDataCenter.getBindings(exchangeName);for (Map.EntryString, Binding entry : bindingMap.entrySet()) {// 1) 获取到绑定对象判定对应的队列是否存在Binding binding entry.getValue();MSGQueue queue memoryDataCenter.getQueue(binding.getQueueName());if (queue null) {// 此处咱们就不抛出异常了可能此处有多个这样的队列// 希望不要因为一个队列的失败影响到其他队列的消息传输System.out.println([VirtualHost] basisPublish 发布消息时发现队列不存在queueName binding.getQueueName());continue;}// 2) 构造消息对象Message message Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列// 如果是 fanout所有绑定的队列都要转发// 如果是 topic还需要判定下bindingKey 和 routingKey 是不是匹配if (!router.route(exchange.getType(), binding, message)) {continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {System.out.println([VirtualHost] 消息发送失败);e.printStackTrace();return false;}
}