当前位置: 首页 > news >正文

html教程 pdf网站建设优化兰州

html教程 pdf,网站建设优化兰州,专业网络推广,苏州产品推广公司从0实现一个消息队列中间件 什么是消息队列需求分析核心概念核心API交换机类型持久化网络通信网络通信API 消息应答 模块划分项目创建创建核心类创建Exchange创建MSGQueue创建Binding创建Message 数据库设计配置sqlite实现创建表和数据库基本操作 实现DataBaseManager创建DataB… 从0实现一个消息队列中间件 什么是消息队列需求分析核心概念核心API交换机类型持久化网络通信网络通信API 消息应答 模块划分项目创建创建核心类创建Exchange创建MSGQueue创建Binding创建Message 数据库设计配置sqlite实现创建表和数据库基本操作 实现DataBaseManager创建DataBaseManager类初始化数据库实现checkDBExists实现createTable实现createDefaultData封装其他数据库操作 消息存储设计文件格式queue_data.txt 文件格式queue_stat.txt文件格式 创建MessageFileManager类实现统计文件读写实现创建队列目录实现删除队列目录检查队列文件是否存在实现消息对象序列化/反序列化实现写入消息文件实现删除消息实现消息加载实现垃圾回收 整合数据库和文件创建DiskDataCenter 内存数据结构设计创建MemoryDataCenter 虚拟主机设计创建VirtualHost 订阅消息添加一个订阅者创建订阅者管理类消息确认 网络通信协议设计设计应用层协议定义Request/Response定义参数父类定义返回值父类定义其他参数类ExchangeDeclareArgumentsExchangeDeleteArgumentsQueueDeclareArgumentsQueueDeleteArgumentsQueueBindArgumentsQueueUnbindArgumentsBasicPublishArgumentsBasicConsumeArgumentsSubScribeReturns 实现BrokerServer启动/停止服务器实现处理连接实现readRequest实现writeResponse实现处理请求 实现clearClosedSessio 实现客户端创建 ConnectionFactoryConnection 和Channel的定义Connection的定义封装请求响应读写操作创建channel Channel的定义创建channel实现generateRid 实现waitResult关闭channel创建交换机删除交换机创建队列创建绑定删除绑定发送消息订阅消息确认消息 处理响应创建扫描线程实现响应的分发 关闭Connection 什么是消息队列 曾经我们学习过阻塞队列(BlockingQueue),我们说,阻塞队列最⼤的⽤途,就是⽤来实现⽣产者消费者模型. ⽣产者消费者模型,存在诸多好处,是后端开发的常⽤编程⽅式. 在实际的后端开发中,尤其是分布式系统⾥,跨主机之间使⽤⽣产者消费者模型,也是⾮常普遍的需求.因此,我们通常会把阻塞队列,封装成⼀个独⽴的服务器程序,并且赋予其更丰富的功能.这样的程序我们就称为消息队列(MessageQueue,MQ)市⾯上成熟的消息队列⾮常多. RabbitMQ Kafka RocketMQ ActiveMQ… 其中,RabbitMQ是⼀个⾮常知名,功能强⼤,⼴泛使⽤的消息队列.咱们就仿照RabbitMQ,模拟实现⼀个简单的消息队列 需求分析 核心概念 ⽣产者(Producer) 消费者(Consumer) 中间⼈(Broker) 发布(Publish) 订阅(Subscribe 其中,Broker是最核⼼的部分.负责消息的存储和转发 在Broker中,⼜存在以下概念. 虚拟机(VirtualHost): 类似于MySQL的database,是⼀个逻辑上的集合.⼀个BrokerServer上可以存在多个VirtualHost. 交换机(Exchange):⽣产者把消息先发送到Broker的Exchange上.再根据不同的规则,把消息转发 给不同的Queue. 队列(Queue):真正⽤来存储消息的部分.每个消费者决定⾃⼰从哪个Queue上读取消息. 绑定(Binding):Exchange和Queue之间的关联关系.Exchange和Queue可以理解成多对多关系.使⽤⼀个关联表就可以把这两个概念联系起来. 消息(Message):传递的内容 这些概念,既需要在内存中存储,也需要在硬盘上存储. 内存存储:⽅便使⽤.硬盘存储:重启数据不丢失 核心API 对于Broker来说,要实现以下核⼼API.通过这些API来实现消息队列的基本功能. 创建队列(queueDeclare)销毁队列(queueDelete)创建交换机(exchangeDeclare)销毁交换机(exchangeDelete)创建绑定(queueBind)解除绑定(queueUnbind)发布消息(basicPublish)订阅消息(basicConsume)确认消息(basicAck) 另⼀⽅⾯,Producer和Consumer则通过⽹络的⽅式,远程调⽤这些API,实现⽣产者消费者模型. 交换机类型 对于RabbitMQ来说,主要⽀持四种交换机类型. Direct Fanout Topic Header 其中Header这种⽅式⽐较复杂,⽐较少⻅.常⽤的是前三种交换机类型.咱们此处也主要实现这三种. Direct: ⽣产者发送消息时,直接指定被该交换机绑定的队列名.Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.Topic: 绑定队列到交换机上时,指定⼀个字符串为bindingKey.发送消息指定⼀个字符串为routingKey. 当 routingKey 和bindingKey满⾜⼀定的匹配条件的时候,则把消息投递到指定队列 持久化 Exchange, Queue, Binding, Message 都有持久化需求.当程序重启/主机重启,保证上述内容不丢失 网络通信 ⽣产者和消费者都是客⼾端程序,broker则是作为服务器.通过⽹络进⾏通信.在⽹络通信的过程中,客⼾端部分要提供对应的api,来实现对服务器的操作 网络通信API 创建Connection关闭Connection创建Channel关闭Channel创建队列(queueDeclare)销毁队列(queueDelete)创建交换机(exchangeDeclare)销毁交换机(exchangeDelete)创建绑定(queueBind)解除绑定(queueUnbind)发布消息(basicPublish)订阅消息(basicConsume)确认消息(basicAck) 可以看到,在broker的基础上,客⼾端还要增加Connection操作和Channel操作. Connection 对应⼀个TCP连接. Channel 则是Connection中的逻辑通道.⼀个Connection中可以包含多个Channel. Channel 和Channel之间的数据是独⽴的.不会相互⼲扰. 这样的设定主要是为了能够更好的复⽤TCP连接,达到⻓连接的效果,避免频繁的创建关闭TCP连接. 消息应答 被消费的消息,需要进⾏应答 应答模式分成两种 ⾃动应答:消费者只要消费了消息,就算应答完毕了.Broker直接删除这个消息. ⼿动应答:消费者⼿动调⽤应答接⼝,Broker收到应答请求之后,才真正删除这个消息. ⼿动应答的⽬的,是为了保证消息确实被消费者处理成功了.在⼀些对于数据可靠性要求⾼的场景,⽐较常⻅ 模块划分 项目创建 创建SpringBoot项⽬. 使⽤SpringBoot2系列版本,Java8. 依赖引⼊SpringWeb和MyBatis 创建核心类 创建包mqserver.core 创建Exchange public class Exchange {//使用name作为交换机的唯一身份标识private String name;//交换机类型DIRECT FANOUT TOPICprivate ExchangeType type ExchangeType.DIRECT;//该交换机是否需要持久化存储private boolean durable false;//Todo: 如果当前交换机没人使用 就自动删除private boolean autoDelete false;//Todo: 额外参数选项//为了将这个数据存储到数据库中 要转换为json字符串存储private MapString , Object arguments new HashMap();public String getArguments(){//把当前arguments参数从map转化为json字符串ObjectMapper objectMapper new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}//如果异常 返回空json字符串return {};}public void setArguments(String argumentsJson){//吧argumentsJson转化为map对象ObjectMapper objectMapper new ObjectMapper();try {objectMapper.readValue(argumentsJson,new TypeReferenceHashMapString,Object(){});} catch (JsonProcessingException e) {e.printStackTrace();}}//针对arguments 再提供一组getter setter方法 在代码内部测试使用public Object getArguments(String key){return arguments.get(key);}public void setArguments(String key, Object value){arguments.put(key,value);}public void setArguments(MapString,Object arguments){this.arguments arguments;}}交换机类型作为枚举类 public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type){this.type type;}public int getType(){return type;}} name :交换机的名字.相当于交换机的⾝份标识. type :交换机的类型.三种取值,DIRECT,FANOUT,TOPIC. durable :交换机是否要持久化存储.true为持久化,false不持久化. autoDelete:使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤. arguments :交换机的其他参数属性.预留字段,暂时未使⽤ 创建MSGQueue Data public class MSGQueue {//队列的唯一身份标识private String name;//表示队列是否持久化private boolean durable;//Todo: 表示是否独占//true 表示只能为一个消费者使用 false 表示为大家都能使用private boolean exclusive;//todo:自动删除private boolean autoDelete false;//todo:参数列表private MapString, Object arguments new HashMap();//表示当前队列都有哪些消费者订阅了private ListConsumerEnv consumerEnvList new ArrayList();//记录当前取到了第几个消费者 方便实现轮询策略private AtomicInteger consumerSeq new AtomicInteger(0);//添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}//暂时不考虑订阅者删除//挑选一个订阅者 来处理当前的消息public ConsumerEnv chooseConsumerEnv() {//轮询的方式取if (consumerEnvList.size() 0) {//无人订阅return null;}//计算当前下标int index consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getArguments() {ObjectMapper objectMapper new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return {};}public void setArguments(String argumentsJson) {ObjectMapper objectMapper new ObjectMapper();try {objectMapper.readValue(argumentsJson, new TypeReferenceHashMapString, Object() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key, Object value) {arguments.put(key, value);}public void setArguments(MapString, Object arguments) {this.arguments arguments;}}name :队列的名字.相当于队列的⾝份标识. durable :交换机是否要持久化存储.true为持久化,false不持久化. exclusive :独占(排他),队列只能被⼀个消费者使⽤. autoDelete :使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤. arguments :交换机的其他参数属性.预留字段,暂时未使⽤ 创建Binding Data public class Binding {//交换机名private String exchangeName;//队列名private String queueName;//绑定关键字private String bindingKey; } exchangeName 交换机名字 queueName 队列名字 bindingKey 只在交换机类型为 TOPIC 时才有效.⽤于和消息中的 routingKey 进⾏匹配 创建Message {private BasicProperties basicProperties new BasicProperties();private byte[] body;//辅助属性//这两个属性不需要序列化//一个文件存储很多消息 找到某个消息使用下面两个偏移量来找到消息 前闭后开[)//文件开头到消息数据的位置偏移private transient long offsetBeg 0;//文件结尾到消息数据的位置偏移private transient long offsetEnd 0;//使用这个属性表示该消息在文件中是否为有效消息(逻辑删除)//0x1有效 0x0无效private byte isValid 0x1;//创建工厂方法 让工厂方法帮我们封装一下message对象的过程//万一两个参数冲突 一外面的为主public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){Message message new Message();if(basicProperties ! null){message.setBasicProperties(basicProperties);}//M-为前缀 和其他uuid做区分message.setMessageId(M- UUID.randomUUID().toString());message.basicProperties.setRoutingKey(routingKey);message.body body;//此处是吧body和basicPro设置出来 其他属性暂时不设置return message;}//获取messageIdpublic String getMessageId(){return basicProperties.getMessageId();}//设置messageIdpublic void setMessageId(String messageId){basicProperties.setMessageId(messageId);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public int getDeliverMode(){return basicProperties.getDeliverMode();}public void setDeliverMode(int mode){basicProperties.setDeliverMode(mode);} }BasicProperties参数类 Data public class BasicProperties implements Serializable {//消息的唯一身份标识 保证唯一性 使用UUID来创建private String messageId;//消息上带有的 和bindingKey做匹配private String routingKey;// 表示消息是否要持久化 1表示不持久化 2表示持久化private int deliverMode 1; } Message 需要实现 Serializable 接⼝.后续需要把Message写⼊⽂件以及进⾏⽹络传输. basicProperties 是消息的属性信息. body 是消息体. offsetBeg 和 offsetEnd 表⽰消息在消息⽂件中所在的起始位置和结束位置.这⼀块具体的 设计后⾯再详细介绍.使⽤transient 关键字避免属性被序列化. isValid ⽤来表⽰消息在⽂件中是否有效.这⼀块具体的设计后⾯再详细介绍. createMessageWithId 相当于⼀个⼯⼚⽅法,⽤来创建⼀个Message实例.messageId通过 UUID的⽅式⽣成. 数据库设计 对于Exchange,MSGQueue,Binding,我们使⽤数据库进⾏持久化保存. 此处我们使⽤的数据库是SQLite,是⼀个更轻量的数据库. SQLite 只是⼀个动态库(当然,官⽅也提供了可执⾏程序exe),我们在Java中直接引⼊SQLite依赖,即可直接使⽤,不必安装其他的软件 配置sqlite 引入依赖 dependencygroupIdorg.xerial/groupIdartifactIdsqlite-jdbc/artifactIdversion3.41.0.1/version/dependency配置数据源 spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBCmybatis:mapper-locations: classpath:mapper/**Mapper.xml 此处我们约定,把数据库⽂件放到./data/meta.db 中. SQLite 只是把数据单纯的存储到⼀个⽂件中.⾮常简单⽅便 实现创建表和数据库基本操作 Mapper public interface MetaMapper {//提供三个核心建表方法void createExchangeTable();void createQueueTable();void createBindingTable();//针对上面三个基本概念进行插入删除void insertExchange(Exchange exchange);void deleteExchange(String exchange);void insertQueue(MSGQueue queue);void deleteQueue(String queueName);void insertBinding(Binding binding);void deleteBinding(Binding binding);ListExchange selectAllExchanges();ListMSGQueue selectAllQueues();ListBinding selectAllBindings();}?xml version1.0 encodingUTF-8? !DOCTYPE mapper PUBLIC -//mybatis.org//DTD Mapper 3.0//EN http://mybatis.org/dtd/mybatis-3-mapper.dtd mapper namespacecom.example.mq.mqserver.mapper.MetaMapperupdate idcreateExchangeTablecreate table if not exists exchange(name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024));/updateupdate idcreateQueueTablecreate table if not exists queue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024));/updateupdate idcreateBindingTablecreate table if not exists binding(exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256));/update实现DataBaseManager 管理数据库的 mqserver.datacenter.DataBaseManage 创建DataBaseManager类 通过这个类来封装针对数据库的操作 初始化数据库 public void init() {//手动获取到metaMappermetaMapper MqApplication.context.getBean(MetaMapper.class);//建库建表 插入一些默认数据//如果数据库已经存在 不做任何操作 , 如果数据库不存在 则创建库创建表 构造默认数据//根据meta.db文件是否存在来做判断if (!checkDBExists()) {//数据库不存在 就进行建库建表操作//先创建一个data目录File dataDir new File(./data);dataDir.mkdirs();//不存在 创建表createTable();createDefaultData();log.info([DataBaseManager] 数据库初始化完成);} else {//数据库已经存在log.info([DataBaseManager] 数据库已经存在);}}针对MqApplication,需要新增⼀个context属性.并初始化. SpringBootApplication public class MqApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context SpringApplication.run(MqApplication.class, args);BrokerServer brokerServer new BrokerServer(9090);brokerServer.start();}}实现checkDBExists private boolean checkDBExists() {//判断meta.db文件是否存在File file new File(./data/meta.db);return file.exists();}实现createTable //建表操作 不需要建库//首次执行数据库造作 就会自动创建meta.db文件(Mybatis执行)private void createTable() {metaMapper.createExchangeTable();metaMapper.createQueueTable();metaMapper.createBindingTable();log.info([DataBaseManager] 创建表完成);}实现createDefaultData //创建默认数据//主要是添加默认交换机private void createDefaultData() {//构造默认交换机Exchange exchange new Exchange();exchange.setName();exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);log.info([DataBaseManager] 创建默认数据完成);}封装其他数据库操作 //封装其他数据库的操作public void insertExchange(Exchange exchange) {metaMapper.insertExchange(exchange);}public void deleteExchange(String exchangeName) {metaMapper.deleteExchange(exchangeName);}public void insertQueue(MSGQueue queue) {metaMapper.insertQueue(queue);}public void deleteQueue(String queueName) {metaMapper.deleteQueue(queueName);}public void deleteBinding(Binding binding) {metaMapper.deleteBinding(binding);}public void insertBinding(Binding binding) {metaMapper.insertBinding(binding);}public ListExchange selectAllExchanges() {return metaMapper.selectAllExchanges();}public ListMSGQueue selectAllQueues(){return metaMapper.selectAllQueues();}public ListBinding selectAllBindings(){return metaMapper.selectAllBindings();}消息存储设计 设计思路: 消息需要在硬盘上存储.但是并不直接放到数据库中,⽽是直接使⽤⽂件存储 原因如下: 对于消息的操作并不需要复杂的增删改查.对于⽂件的操作效率⽐数据库会⾼很多 我们给每个队列分配⼀个⽬录.⽬录的名字为data队列名.形如 ./data/testQueue 该⽬录中包含两个固定名字的⽂件. queue_data.txt 消息数据文件 用来保存消息内容queue_stat.txt消息统计文件 用来保存消息统计信息 文件格式 queue_data.txt 文件格式 使用二进制方式存储 每个消息分为四个部分: 前四个字节表示Message对象的长度后面若干字节 表示Message 内容消息和消息之间首尾相连 每个Message基于Java标准库的 ObjectInputStream/ObjectOutputStream序列化 Message 对象中的offsetBeg和offsetEnd正是⽤来描述每个消息体所在的位置. queue_stat.txt文件格式 使用文本方式存储 文件中只包含一行 里面包含两列 使用\t分割 第一列表示当前的总消息数目 第二列表示有效消息数目 形如 2000\t1500 创建MessageFileManager类 创建 mqserver.database.MessageFileManager public class MessageFileManager {//定义内部类表示该队列的统计信息static public class Stat {public int totalCount;//总数量public int validCount;//有效消息数量}public void init(){//暂时不需要初始化}//约定消息文件所在的目录和文件名//获取指定队列对应的消息文件所在路径private String getQueueDir(String queueName) {return ./data/ queueName;}//这个方法用来获取该队列的消息数据文件路径private String getQueueDataPath(String queueName) {return getQueueDir(queueName) /queue_data.txt;}//这个方法用来获取该队列的消息统计文件路径private String getQueueStatPath(String queueName) {return getQueueDir(queueName) /queue_stat.txt;} }包含一个内部类stat 用来表示消息统计文件的内容 实现统计文件读写 private Stat readStat(String queueName) {Stat stat new Stat();try (InputStream inputStream new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner new Scanner(inputStream);stat.totalCount scanner.nextInt();stat.validCount scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}//写统计文件private void writeStat(String queueName, Stat stat) {//使用printWrite//outputstream打开文件默认情况下啊 会把源文件清空 此时相当于旧文件覆盖新文件try (OutputStream outputStream new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter new PrintWriter(outputStream);printWriter.write(stat.totalCount \t stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}实现创建队列目录 每个队列都有⾃⼰的⽬录和配套的⽂件.通过下列⽅法把⽬录和⽂件先准备好. public void createQueueFiles(String queueName) throws IOException {// 1.创建⽬录指定队列的⽬录File baseDir new File(getQueueDir(queueName));if (!baseDir.exists()) {boolean ok baseDir.mkdirs();if (!ok) {throw new IOException( 创建⽬录失败 ! baseDir baseDir.getAbsolutePath());}}// 2.创建队列数据⽂件File queueDataFile new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {boolean ok queueDataFile.createNewFile();if (!ok) {throw new IOException( 创建⽂件失败 ! queueDataFile queueDataFile.getAbsolutePath());}}// 3.创建队列统计⽂件File queueStatFile new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok queueStatFile.createNewFile();if (!ok) {throw new IOException( 创建⽂件失败 ! queueStatFile queueStatFile.getAbsolutePath());}}// 4.给队列统计⽂件写⼊初始数据Stat stat new Stat();stat.totalCount 0;stat.validCount 0;writeStat(queueName, stat);}把上述约定的⽂件都创建出来,并对消息统计⽂件进⾏初始化 实现删除队列目录 如果队列需要删除,则队列对应的⽬录/⽂件也需要删除 //删除队列的目录和文件//队列也是可以删除的 当队列删除之后 对应的消息文件也会被删除public void destroyQueueFiles(String queueName) throws IOException {//先删除里面的文件 在删除目录File queueDataFile new File(getQueueDataPath(queueName));boolean ok1 queueDataFile.delete();File queueStatFile new File(getQueueStatPath(queueName));boolean ok2 queueStatFile.delete();File baseDir new File(getQueueDir(queueName));boolean ok3 baseDir.delete();if (!ok1 || !ok2 || !ok3) {//其中有任意一个失败 都算整体删除失败throw new IOException(删除队列目录和文件失败 baseDir baseDir.getAbsolutePath());}}注意:File类的delete⽅法只能删除空⽬录.因此需要先把内部的⽂件先删除掉. 检查队列文件是否存在 判定该队列的消息⽂件和统计⽂件是否存在.⼀旦出现缺失,则不能进⾏后续⼯作 //检查队列的目录和文件是否存在public boolean checkFilesExists(String queueName) {//判定队列的数据文件 和统计文件是否都存在File queueDataFile new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}实现消息对象序列化/反序列化 Message对象需要转成二进制写入文件,并且也需要吧文件中的二进制读出来解析成Message对象 此处针对这里的逻辑进行封装 创建common.BinaryTool包 public class BinaryTool {//将一个对象 序列化成一个数组public static byte[] toBytes(Object object) throws IOException {//这个流对象相当于一个变长的字节数组//就可以把object序列化的数据给逐渐的写入到byteArrayOutputStream中 在同一转化成byte[]try( ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream()){try(ObjectOutputStream objectOutputStream new ObjectOutputStream(byteArrayOutputStream)){//此处的writeObject就会把该对象进行序列化 生成的二进制数据就会写入到ObjectOutputStream中//由于objectOutputStream关联到byteArrayOutputStream 所以最终结果写入到byteArrayOutputStreamobjectOutputStream.writeObject(object);}return byteArrayOutputStream.toByteArray();}}//将一个字节数字 反序列化成一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object null;try(ByteArrayInputStream byteArrayInputStream new ByteArrayInputStream(data)){try(ObjectInputStream objectInputStream new ObjectInputStream(byteArrayInputStream)){//从data这个byte[]数组中获取数据并进行反序列化object objectInputStream.readObject();}}return object;} } 使⽤ByteArrayInputStream/ByteArrayOutputStream针对byte[]进⾏封装,⽅便后续操作.(这两个流对象是纯内存的,不需要进⾏close).使⽤ObjectInputStream/ObjectOutputStream进⾏序列化/反序列化操作.通过内部的readObject/writeObject即可完成对应操作.此处涉及到的序列化对象,需要实现Serializable接⼝.这⼀点咱们的Message对象已经实现过了. 实现写入消息文件 //把一个新的消息 放到队列对应的文件中public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {//检查档期那要写入的队列对应的文件是否存在if (!checkFilesExists(queue.getName())) {throw new MqException([MessageFileManager] 队列对应文件不存在 queueName queue.getName());}//把message对象进行序列化 转化成二进制字节数组byte[] messageBinary BinaryTool.toBytes(message);synchronized (queue) {//获取到当前队列数据文件的长度 用来计算message的 offsetBeg和offsetEnd//offsetEnd 就是当前文件长度 4 message自身长度File queueDataFile new File(getQueueDataPath(queue.getName()));message.setOffsetBeg(queueDataFile.length() 4);message.setOffsetEnd(queueDataFile.length() 4 messageBinary.length);//写消息到文件中 追加到数据文件末尾 而不是覆盖try (OutputStream outputStream new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {//接下来先写当前消息的长度//这个操作就是写入四个字节了dataOutputStream.writeInt(messageBinary.length);//写入消息本体dataOutputStream.write(messageBinary);}}//更新消息统计文件Stat stat readStat(queue.getName());stat.totalCount 1;stat.validCount 1;writeStat(queue.getName(), stat);}}考虑线程安全 按照队列维度进行加锁需要记录Message对象在⽂件中的偏移量.后续的删除操作依赖这个偏移量定位到消息.offsetBeg是原有⽂件⼤⼩的基础上,再4.4个字节是存放消息⼤⼩的空间.(参考上⾯的图)写完消息,要同时更新统计信息 创建common.MqException ,作为⾃定义异常类.后续业务上出现问题,都统⼀抛出这个异常.. /*** 自定义异常类*/ public class MqException extends Exception{public MqException(String reason) {super(reason);} } 实现删除消息 此处删除消息只是逻辑删除 ,即把Message类中的isValid字段设置为0 //删除消息//逻辑删除 将isValid属性设置成0//先把文件中的数据还原为message对象 isValid改成0 在重新写回去//此处message必须包含有效的offsetBeg 和OffsetEndpublic void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {//随机访问 在文件的指定位置读写//seek方法移动光标synchronized (queue) {try (RandomAccessFile randomAccessFile new RandomAccessFile(getQueueDataPath(queue.getName()), rw)) {//先从文件中读取相应的message数据byte[] bufferSrc new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);//把二进制数据转化为Message对象Message diskMessage (Message) BinaryTool.fromBytes(bufferSrc);//设置isValiddiskMessage.setIsValid((byte) 0x0);//重新写入byte[] bufferDest BinaryTool.toBytes(diskMessage);//此时重新 seek 虽然上面已经seek过 但是进行了读操作 导致文件光标向后移动到下一个消息的位置//因此需要重新seek 调整光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);}//更新统计文件Stat stat readStat(queue.getName());if ((stat.validCount 0)) {stat.validCount - 1;}writeStat(queue.getName(), stat);}实现消息加载 把消息内容从⽂件加载到内存中.这个功能在服务器重启,和垃圾回收的时候都很关键 public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedListMessage messages new LinkedList();try (InputStream inputStream new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream new DataInputStream(inputStream)) {//模拟光标位置long currentOffset 0;//循环读取消息while (true) {//读取当前消息//这里可能会读到文件末尾 readInt方法到达末尾会抛出EOFException异常int messageSize dataInputStream.readInt();//根据消息长度 读取消息byte[] buffer new byte[messageSize];int actualSize dataInputStream.read(buffer);if (messageSize ! actualSize) {throw new MqException([MessageFileManager] 文件格式错误 queueName queueName);}//把读到的二进制数据 反序列化为Message对象Message message (Message) BinaryTool.fromBytes(buffer);//判断这个小时是否为无效对象if (message.getIsValid() ! 0x1) {//无效数据直接跳过currentOffset (4 messageSize);continue;}//有效数据 则需要把这个数据加入到链表中 加入前要填写offsetBeg offsetEnd//进行计算时 需要知道当前光标的位置//此时手动计算下标message.setOffsetBeg(currentOffset 4);message.setOffsetEnd(currentOffset 4 messageSize);currentOffset (4 messageSize);//添加到链表中messages.add(message);}}catch (EOFException e){//此时不是真正处理异常 而是处理正常业务逻辑System.out.println([MessageFileManager] 恢复Message数据完成);}}return messages;}实现垃圾回收 上述删除操作,只是把消息在⽂件上标记成了⽆效.并没有腾出硬盘空间.最终⽂件⼤⼩可能会越积越多.因此需要定期的进⾏批量清除.此处使⽤类似于复制算法.当总消息数超过2000,并且有效消息数⽬少于50%的时候,就触发GC. GC的时候会把所有有效消息加载出来,写⼊到⼀个新的消息⽂件中,使⽤新⽂件,代替旧⽂件即可. //检查当前是否要针对该队列的消息数据文件进行GCpublic boolean checkGC(String queueName){//判定是否要GC 是根据总消息数和有效消息数 这两个值 是在消息统计文件中的Stat stat readStat(queueName);if(stat.totalCount 2000 (double)stat.validCount /(double) stat.totalCount 0.5){return true;}return false;}gc //垃圾回收操作//使用复制算法//创建新文件 名字为queue_data_new.txt//把之前的消息数据文件中有效的消息读取出来 写到新文件中//删除旧的消息 把文件名改回queue_data.txt//同时要记得更新消息统计文件public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {//进行gc时 其他线程不能对文件进行修改synchronized (queue){//由于gc操作比较耗时 此处统计消耗时间long gcBeg System.currentTimeMillis();//创建一个新文件File queueDataNewFile new File(getQueueDataNewPath(queue.getName()));if(queueDataNewFile.exists()){//正常情况下 这个文件不应该存在 说明上一次gc一半 程序意外结束throw new MqException([MessageFileManager] gc 时发现该队列的queue_data_new 已经存在);}boolean ok queueDataNewFile.createNewFile();if(!ok){throw new MqException([MessageFileManager] 创建文件失败 queueDataNewFile queueDataNewFile.getAbsolutePath() );}//从旧文件中获取出所有有效消息LinkedListMessage messages loadAllMessageFromQueue(queue.getName());//把有效消息 写入到新文件try(OutputStream outputStream new FileOutputStream(queueDataNewFile,true)){try(DataOutputStream dataOutputStream new DataOutputStream(outputStream)){for(Message message : messages){byte[] buffer BinaryTool.toBytes(message);dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}//删除旧文件 把新文件重命名File queueDataOldFile new File(getQueueDataPath(queue.getName()));ok queueDataOldFile.delete();if(!ok){throw new MqException([MessageFileManager] 删除旧数据失败 queueDataOldFile queueDataOldFile.getAbsolutePath());}//重命名文件ok queueDataNewFile.renameTo(queueDataOldFile);if(!ok){throw new MqException([MessageFileManager] 文件重命名失败 queueDataNewFile queueDataNewFile.getAbsolutePath() , queueDataOldFile queueDataOldFile.getAbsolutePath());}//更新统计文件Stat stat new Stat();stat.totalCount messages.size();stat.validCount messages.size();writeStat(queue.getName(),stat);long gcEnd System.currentTimeMillis();System.out.println([MessageFileManager] gc 执行完毕 queueName queue.getName() time: (gcEnd - gcBeg) ms);}} 整合数据库和文件 上述代码中,使⽤数据库存储了Exchange,Queue,Binding,使⽤⽂本⽂件存储了Message.接下来我们把两个部分整合起来,统⼀进⾏管理 创建DiskDataCenter 使⽤DiskDataCenter来综合管理数据库和⽂本⽂件的内容.DiskDataCenter 会持有DataBaseManager和MessageFileManager对象 public class DiskDataCenter {private DataBaseManager dataBaseManager new DataBaseManager();private MessageFileManager messageFileManager new MessageFileManager();public void init(){//对上面两个类进行初始化dataBaseManager.init();messageFileManager.init();}//封装交换机操作public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public ListExchange selectAllExchanges(){return dataBaseManager.selectAllExchanges();}//封装队列操作public void insertQueue(MSGQueue queue) throws IOException {//创建队列的同时 还要创建对应的目录dataBaseManager.insertQueue(queue);messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {//删除队列也要删除对应的目录dataBaseManager.deleteQueue(queueName);messageFileManager.destroyQueueFiles(queueName);}public ListMSGQueue selectAllQueues(){return dataBaseManager.selectAllQueues();}//封装绑定操作public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public ListBinding selectAllBindings(){return dataBaseManager.selectAllBindings();}//封装消息操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);if(messageFileManager.checkGC(queue.getName())){messageFileManager.gc(queue);}}public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);} }⼩结 通过上述封装,把数据库和硬盘⽂件两部分合并成⼀个整体.上层代码在调⽤的时候则不再关⼼该数据 是存储在哪个部分的. 这个类的整体实现并不复杂,关键逻辑在之前都已经准备好了 内存数据结构设计 硬盘上存储数据,只是为了实现持久化这样的效果.但是实际的消息存储/转发,还是主要靠内存的结构. 对于MQ来说,内存部分是更关键的,内存速度更快,可以达成更⾼的并发 创建MemoryDataCenter 创建mqserver.datacenter.MemoryDataCenter 使用这个类来管理所有内存中的数据 使⽤四个哈希表,管理Exchange,Queue,Binding,Message.使⽤⼀个哈希表链表管理队列-消息之间的关系.使⽤⼀个哈希表哈希表管理所有的未被确认的消息. 为了保证消息被正确消费了,会使⽤两种⽅式进⾏确认.⾃动ACK和⼿动ACK. 其中⾃动ACK是指当消息被消费之后,就会⽴即被销毁释放. 其中⼿动ACK是指当消息被消费之后,由消费者主动调⽤⼀个basicAck⽅法,进⾏主动确认.服务器 收到这个确认之后,才能真正销毁消息. 此处的未确认消息就是指在⼿动ACK模式下,该消息还没有被调⽤basicAck.此时消息不能删除, 但是要和其他未消费的消息区分开.于是另搞了个结构. 当后续basicAck到了,就可以删除消息了 /*** 使用这个类来统一管理内存中的所有数据*/ public class MemoryDataCenter {//key是exchangeName, value是exchange对象private ConcurrentHashMapString, Exchange exchangeMap new ConcurrentHashMap();//key是queueName value是MSGQueue对象private ConcurrentHashMapString, MSGQueue queueMap new ConcurrentHashMap();//key是exchangeName 第二个key是queueNameprivate ConcurrentHashMapString, ConcurrentHashMapString, Binding bindingsMap new ConcurrentHashMap();//key是messageId value是一个Message对象private ConcurrentHashMapString, Message messageMap new ConcurrentHashMap();//key是queueName value是一个Message的链表private ConcurrentHashMapString, LinkedListMessage queueMessageMap new ConcurrentHashMap();private ConcurrentHashMapString, ConcurrentHashMapString, Message queueMessageWaitAckMap new ConcurrentHashMap();public void insertExchange(Exchange exchange) {exchangeMap.put(exchange.getName(), exchange);System.out.println([MemoryDataCenter] 新交换机添加成功 exchangeName exchange.getName());}public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}public void deleteExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println([MemoryDataCenter] 交换机删除成功 exchangeName exchangeName);}public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(), queue);System.out.println([MemoryDataCenter] 新队列添加成功 queueName queue.getName());}public MSGQueue getQueue(String queueName) {return queueMap.get(queueName);}public void deleteQueue(String queueName) {queueMap.remove(queueName);System.out.println([MemoryDataCenter] 队列删除成功 queueName queueName);}public void insertBinding(Binding binding) throws MqException { // //先使用exchangeName来查找哈希表是否有存在 // ConcurrentHashMapString, Binding bindingMap this.bindingsMap.get(bingding.getExchangeName()); // if (bindingMap null){ // bindingsMap.put(binding.getExchangeName(),bindingMap); // }//现根据exchangeName查一下 对应的哈希表是否存在 不存在就创建一个ConcurrentHashMapString, Binding bindingMap bindingsMap.computeIfAbsent(binding.getExchangeName(),k - new ConcurrentHashMap());synchronized (bindingMap) {//再根据queueName查找一下 如果已经存在 就抛出异常 不存在才能插入if (bindingsMap.get(binding.getQueueName()) ! null) {throw new MqException([MemoryDataCenter] 绑定已经存在 exchangeName binding.getExchangeName() , queueName binding.getQueueName());}bindingMap.put(binding.getQueueName(), binding);}System.out.println([MemoryDataCenter] 新绑定添加成功 exchangeName binding.getExchangeName() ,queueName binding.getQueueName());}//获取绑定//1. 根据exchangeName和queueName确定唯一一个Binding//2. 根据exchangeName获取到所有的bindingpublic Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMapString, Binding bindingMap bindingsMap.get(exchangeName);if (bindingMap null) {return null;}return bindingMap.get(queueName);}public ConcurrentHashMapString, Binding getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}//删除绑定public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMapString, Binding bindingMap bindingsMap.get(binding.getExchangeName());if (bindingMap null) {//该交换机没有绑定任何队列throw new MqException([MemoryDataCenter] 绑定不存在 exchangeName binding.getExchangeName() , queueName binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println([MemoryDataCenter] 删除绑定成功 exchangeName binding.getExchangeName() ,queueName binding.getQueueName());}//添加消息public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println([MemoryDataCenter] 新消息添加成功 messageId message.getMessageId());}//根据id查消息public Message getMessage(String messageId) {return messageMap.get(messageId);}//根据id删消息public void removeMessage(String messageId) {messageMap.remove(messageId);System.out.println([MemoryDataCenter] 消息被移除 messageId messageId);}//发送消息到指定队列public void sendMessage(MSGQueue queue, Message message) {//先把消息放在指定数据结构中//现根据队列的名称 找到该队列对应的消息链表 // LinkedListMessage messages queueMessageMap.get(queue.getName()); // if (messages null) { // messages new LinkedList(); // queueMessageMap.put(queue.getName(), messages); // }LinkedListMessage messages queueMessageMap.computeIfAbsent(queue.getName(), k - new LinkedList());//把新消息添加到message中synchronized (messages){messages.add(message);}//在这里把该消息向消息中心插入addMessage(message);System.out.println([MemoryDataCenter] 消息被投递到队列中 messageId message.getMessageId());}//从队列中取消息public Message pollMessage(String queueName){//根据队列名 查找对应的消息链表LinkedListMessage messages queueMessageMap.get(queueName);//如果没找到 则说明队列中没有任何消息if(messages null){return null;}synchronized (messages){if(messages.size() 0){return null;}//链表中有元素 就进行头删 取出该元素Message currentMessage messages.remove(0);System.out.println([MemoryDataCenter] 消息从队列中取出 messageId currentMessage.getMessageId());return currentMessage;}}//获取指定队列中消息的个数public int getMessageCount(String queueName){LinkedListMessage messages queueMessageMap.get(queueName);if(messages null){//队列中无消息return 0;}synchronized (messages){return messages.size();}}//未确认消息的操作//添加未确认的消息public void addMessageWaitAck(String queueName, Message message){ConcurrentMapString,Message messageMap queueMessageWaitAckMap.computeIfAbsent(queueName,k - new ConcurrentHashMap());messageMap.put(message.getMessageId(),message);System.out.println([MemoryDataCenter] 消息进入到待确认队列 messageId message.getMessageId());}//删除未确认的消息public void removeMessageWaitAck(String queueName, String messageId){ConcurrentHashMapString, Message messageMap queueMessageWaitAckMap.get(queueName);if(messageMap null){return;}messageMap.remove(messageId);System.out.println([MemoryDataCenter] 消息从待确认队列删除 messageId messageId);}//获取指定未确认的消息public Message getMessageWaitAck(String queueName, String messageId){ConcurrentHashMapString, Message messageMap queueMessageWaitAckMap.get(queueName);if(messageMap null){return null;}return messageMap.get(messageId);}//当服务器重启后 要从硬盘上读取数据 回复到内存中public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {//清空所有旧数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();//恢复所有交换机ListExchange exchanges diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges){exchangeMap.put(exchange.getName(),exchange);}//恢复所有队列ListMSGQueue queueList diskDataCenter.selectAllQueues();for(MSGQueue queue : queueList){queueMap.put(queue.getName(),queue);}//恢复所有绑定ListBinding bindingList diskDataCenter.selectAllBindings();for(Binding binding : bindingList){ConcurrentHashMapString, Binding bindingMap bindingsMap.computeIfAbsent(binding.getExchangeName(), k - new ConcurrentHashMap());bindingMap.put(binding.getQueueName(),binding);}//恢复所有消息//遍历所有的队列 根据每个队列的名字 获取到所有的信息for(MSGQueue queue : queueList){LinkedListMessage messages diskDataCenter.loadAllMessageFromQueue(queue.getName());//将消息加载到内存queueMessageMap.put(queue.getName(),messages);for(Message message : messages){messageMap.put(message.getMessageId(),message);}}//未确认的消息 不需要从硬盘获取//在等待ack的过程中 未被确认的消息就转变为未被取走的消息} } 虚拟主机设计 ⾄此,内存和硬盘的数据都已经组织完成.接下来使⽤虚拟主机这个概念,把这两部分的数据也串起来.并且实现⼀些MQ的关键API 注意:在RabbitMQ中,虚拟主机是可以随意创建/删除的.咱们此处为了实现简单,并没有实现虚拟主机的管理.因此我们默认就只有⼀个虚拟主机的存在.但是在数据结构的设计上我们预留了对于多虚拟主机的管理.保证不同虚拟主机中的Exchange,Queue,Binding,Message都是相互隔离的 创建VirtualHost 创建mqserver.VirtualHost Slf4j Getter //通过这个类表示虚拟主机 //作为业务逻辑的整合 就需要对代码中抛出的异常进行处理了 public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter new MemoryDataCenter();private DiskDataCenter diskDataCenter new DiskDataCenter();private Router router new Router();private ConsumerManager consumerManager new ConsumerManager(this);private final Object exchangeLocker new Object();private final Object queueLocker new Object();public VirtualHost(String virtualHostName) {this.virtualHostName virtualHostName;//对于memoryDataCenter不需要初始化 在类内部已经初始化过了//对于diskDataCenter来说 要进行初始化操作diskDataCenter.init();try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();log.info(恢复内存数据失败);}}//创建交换机//如果交换机不存在 则创建 如果存在 直接返回//返回值是boolean 创建成功true 失败 返回falsepublic boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,MapString, Object arguments) {//把交换机的名字 加上虚拟主机作为前缀exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {Exchange existsExchange memoryDataCenter.getExchange(exchangeName);if (existsExchange ! null) {//该交换机已经存在log.info(交换机已经存在 exchangeName exchangeName);return true;}//真正创建交换机Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);//把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}//4.把交换机对象写入内存memoryDataCenter.insertExchange(exchange);log.info(交换机创建完成 exchangeName exchangeName);//先硬盘 后内存 因为内存容易失败 如果硬盘失败 就不向内存中存储}return true;} catch (Exception e) {log.info(交换机创建失败 exchangeName exchangeName);e.printStackTrace();return false;}}public boolean exchangeDelete(String exchangeName) {exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {//先找到对应的交换机Exchange toDelete memoryDataCenter.getExchange(exchangeName);if (toDelete null) {throw new MqException(交换机不存在 无法删除);}//删除硬盘上的数据if (toDelete.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}//删除内存中的交换机数据memoryDataCenter.deleteExchange(exchangeName);log.info(交换机删除成功 exchangeName exchangeName);}return true;} catch (Exception e) {log.info(交换机删除失败 exchangeName exchangeName);e.printStackTrace();return false;}}//创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,MapString, Object arguments) {//拼接队列名字queueName virtualHostName queueName;try {synchronized (queueLocker) {MSGQueue existsQueue memoryDataCenter.getQueue(queueName);if (existsQueue ! null) {log.info(队列已经存在 queueName queueName);return true;}//创建队列对象MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);//写硬盘if (durable) {diskDataCenter.insertQueue(queue);}//写内存memoryDataCenter.insertQueue(queue);log.info(队列创建成功 queueName queueName);}return true;} catch (Exception e) {log.info(队列创建失败 queueName queueName);e.printStackTrace();return false;}}//删除队列public boolean queueDelete(String queueName) {queueName virtualHostName queueName;try {MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException(队列不存在 无法删除 queueName queueName);}//删除硬盘数据if (queue.isDurable()) {diskDataCenter.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);log.info(交换机删除成功 exchangeName queueName);return true;} catch (Exception e) {log.info(队列创建失败 queueName queueName);e.printStackTrace();return false;}}//绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) {queueName virtualHostName queueName;exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {Binding existsBinding memoryDataCenter.getBinding(exchangeName, queueName);if (existsBinding ! null) {throw new MqException(binding 已经存在 queueName queueName);}//验证bindingkey是否合法if (!router.checkBindingKey(bindingKey)) {throw new MqException(bindingKey非法 bindingKey bindingKey);}//创建Binding对象Binding binding new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);//获取对应的交换机和队列 如果交换机或者队列不存在 也无法创建MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException(队列不存在 queueName queueName);}Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException(交换机不存在 exchangeName exchangeName);}//先写硬盘if (queue.isDurable() exchange.isDurable()) {diskDataCenter.insertBinding(binding);}//在写内存memoryDataCenter.insertBinding(binding);log.info(绑定成功 exchangeName exchangeName queueName queueName);}}return true;} catch (Exception e) {log.info(绑定失败 exchangeName exchangeName queueName queueName);e.printStackTrace();return false;}}public boolean queueUnbind(String queueName, String exchangeName) {queueName virtualHostName queueName;exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {//获取binding看是否已经存在Binding binding memoryDataCenter.getBinding(exchangeName, queueName);if (binding null) {throw new MqException(删除绑定失败 绑定不存在 exchangeName exchangeName , queueName queueName);} // //获取一下对应的队列和交换机 看是否存在 // MSGQueue queue memoryDataCenter.getQueue(queueName); // if(queue null){ // throw new MqException(对应的队列不存在 queueName queueName); // } // Exchange exchange memoryDataCenter.getExchange(exchangeName); // if(exchange null){ // throw new MqException(对应的交换机不存在 exchangeName exchangeName); // }//删除硬盘上的数据 // if(queue.isDurable() exchange.isDurable()){ // diskDataCenter.deleteBinding(binding); // }//无论绑定是否持久化 都进行删除diskDataCenter.deleteBinding(binding);//删除内存的数据memoryDataCenter.deleteBinding(binding);}}return true;} catch (Exception e) {log.info(删除绑定失败);e.printStackTrace();return false;}}//发送交换机到指定的交换机/队列中public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {//转换交换机的名字exchangeName virtualHostName exchangeName;//检查routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException([VirtualHost] routingKey 非法 routingKey routingKey);}//查找交换机对象Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException([VirtualHost] 交换机不存在 exchangeName exchangeName);}//判定交换机的类型if (exchange.getType() ExchangeType.DIRECT) {//直接交换机的转发规则//以routingKey作为队列的名字 直接把消息写入指定的队列中//此时可以无视绑定关系String queueName virtualHostName routingKey;//构造消息对象Message message Message.createMessageWithId(routingKey, basicProperties, body);//查找该队列名对应的对象MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new Exception([VirualHost] 队列不存在 queueName queueName);}//队列存在 直接给队列中写入消息sendMessage(queue, message);} else {//以fanout和topic的方式转发//找到该交换机关联的所有绑定 并遍历这些绑定对象ConcurrentHashMapString, Binding bindingsMap memoryDataCenter.getBindings(exchangeName);for (Map.EntryString, Binding entry : bindingsMap.entrySet()) {//获取到绑定对象Binding binding entry.getValue();MSGQueue queue memoryDataCenter.getQueue(binding.getQueueName());if (queue null) {//此处不跑出异常 可能有多个这样的队列//希望不要因为一个队列的失败 影响其他队列消息的传输log.info(basicPublish 发送消息时 发现消息不存在 queueName binding.getQueueName());continue;}//构造消息对象Message message Message.createMessageWithId(exchangeName, basicProperties, body);//判断这个消息是否能转发给该队列//如果是fanout 则所有绑定的队列都要转发//如果是topic 还要判定下 bindkey和routingKey是否匹配if (!router.route(exchange.getType(), binding, message)) {continue;}//真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {log.info(消息发送失败);e.printStackTrace();return false;}}private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息 就是把消息写入硬盘和内存中int deliverMode message.getDeliverMode();//deliverMode 为1 不持久化 为2 不持久化if (deliverMode 2) {diskDataCenter.sendMessage(queue, message);}memoryDataCenter.sendMessage(queue, message);//通知消费者可以消费消息了consumerManager.notifyConsumer(queue.getName());}//添加一个队列的订阅者 但队列收到消息之后 就要把消息推送给对应的订阅者public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {//构造一个consumerEnv对象 把这个对应的队列找到 再把这个Consumer对象添加到队列中queueName virtualHostName queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);log.info(basicConsumer成功 queueName :{}, queueName);return true;} catch (Exception e) {log.info(basicConsumer 失败 queueName :{}, queueName);e.printStackTrace();return false;}}public boolean basicAck(String queueName, String messageId){queueName virtualHostName queueName;try {//获取到消息和队列Message message memoryDataCenter.getMessage(messageId);if (message null){throw new MqException([VirtualHost] 要确认的消息不存在 messageId messageId);}MSGQueue queue memoryDataCenter.getQueue(queueName);if(queue null){throw new MqException([virtualHost] 要确认的队列不存在 queueName queueName);}//删除硬盘上的数据if(message.getDeliverMode() 2){diskDataCenter.deleteMessage(queue,message);}//删除消息中心的数据memoryDataCenter.removeMessage(messageId);//删除待确认集合中的消息memoryDataCenter.removeMessageWaitAck(queueName,messageId);log.info(basicAck成功 消息被成功确认 queueName:{},messageId:{},queueName,messageId);return true;} catch (MqException | IOException | ClassNotFoundException e) {log.info(basicAck失败 消息确认失败 queueName:{},messageId:{},queueName,messageId);return false;}} } 路由规则 //实现交换机的转发规则 和验证routingKey是否合法 public class Router {//bindingKey构造规则//数字字母下划线//使用.分割//允许*和#public boolean checkBindingKey(String bindingKey) {//todoif (bindingKey.length() 0) {//空字符串 也是合法情况return true;}//检查字符串中不能存在非法字符for (int i 0; i bindingKey.length(); i) {char ch bindingKey.charAt(i);if (ch A ch Z) {continue;}if (ch a ch z) {continue;}if (ch 0 ch 9) {continue;}if (ch _ || ch . || ch * || ch #) {continue;}return false;}//检查*或者#是否是独立的部分String[] words bindingKey.split(\\.);for (String word : words) {//检查word长度 1 且包含#或*就不合法if (word.length() 1 word.contains(*) || word.contains(#)) {return false;}}//约定通配符之间的相邻关系//形如这种 aaa.#.#.bbb aaa.*.#.bbb aaa.#.*.bbb非法//aaa.*.*.bbb合法for (int i 0; i words.length - 1; i) {//是否为连续两个#if (words[i].equals(#) words[i 1].equals(#)) {return false;}//#连着*if (words[i].equals(#) words[i 1].equals(*)) {return false;}//*连着#if (words[i].equals(*) words[i 1].equals(#)) {return false;}}return true;}//数字字母下划线//使用.分割public boolean checkRoutingKey(String routingKey) {if (routingKey.length() 0) {//空字符串 合法 在使用fanout交换机时 routingKey用不上return true;}for (int i 0; i routingKey.length(); i) {char ch routingKey.charAt(i);//判断该字符是否是大写字母if (ch A ch Z) {continue;}//判断该字母是否是小写字母if (ch a ch z) {continue;}//判断该字母是否是阿拉伯数字if (ch 0 ch 9) {continue;}//判定是否是_或者.if (ch _ || ch .) {continue;}//该字符不满足任何一种情况 就返回falsereturn false;}return true;}//这个方法用来判定该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {//TODO//根据不同的exchangeType使用不同的转发规则if (exchangeType ExchangeType.FANOUT) {//所有都转发 直接返回true;return true;} else if (exchangeType ExchangeType.TOPIC) {return routeTopic(binding, message);} else {//不应该存在这种情况throw new MqException([Router] 交换机类型非法 exchangeType exchangeType);}}private boolean routeTopic(Binding binding, Message message) {//先把这两个key进行切分String[] bindingTokens binding.getBindingKey().split(\\.);String[] routingTokens message.getRoutingKey().split(\\.);int bindingIndex 0;int routingIndex 0;//此处使用whilewhile (bindingIndex bindingTokens.length routingIndex routingTokens.length) {//如果遇到* 直接进入下一轮 *可以匹配到任意一个部分if (bindingTokens[bindingIndex].equals(*)) {bindingIndex;routingIndex;continue;} else if(bindingTokens[bindingIndex].equals(#)){//如果遇到#号 要先看看有没有下一个位置bindingIndex;if(bindingIndex bindingTokens.length){//该#后面没有东西 是最后一个字符return true;}//后面还有东西//拿着这个内容 在routingKey中往后找 找到对应的位置//findNextMatch 这个方法用来查找该部分 在routingKey中的位置 返回该下标 没找到返回-1routingIndex findNextMatch(routingTokens,routingIndex,bindingTokens[bindingIndex]);if(routingIndex -1){return false;}//找到的匹配的情况 就继续往后匹配bindingIndex;routingIndex;}else {//如果遇到普通字符串if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){return false;}bindingIndex;routingIndex;}}//判定是否是双方到达末尾//比如 aaa.bbb.ccc和aaa.bbb是要匹配失败的if (bindingIndex bindingTokens.length routingIndex routingTokens.length){return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i routingIndex; i routingTokens.length; i) {if(routingTokens[i].equals(bindingToken)){return i;}}return -1;}} 订阅消息 添加一个订阅者 /添加一个队列的订阅者 但队列收到消息之后 就要把消息推送给对应的订阅者public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {//构造一个consumerEnv对象 把这个对应的队列找到 再把这个Consumer对象添加到队列中queueName virtualHostName queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);log.info(basicConsumer成功 queueName :{}, queueName);return true;} catch (Exception e) {log.info(basicConsumer 失败 queueName :{}, queueName);e.printStackTrace();return false;}}Consumer相当于一个回调函数 放在common.Consumer中 FunctionalInterface public interface Consumer {//每次服务器收到消息 调用 把消息推送给消费者//此处参考rabbitMqvoid handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws MqException, IOException; } 创建订阅者管理类 创建mqserver.core.ConsumerManager //通过这个类来实现消费者消费消息的逻辑 public class ConsumerManager {//持有virualhost来操作数据private VirtualHost parent;//指定一个线程池 执行具体的回调函数private ExecutorService workPool Executors.newFixedThreadPool(4);//存放令牌的队列 实际上就是队列名 为了让线程池知道是哪一个队列的消息private BlockingQueueString tokenQueue new LinkedBlockingQueue();//扫描线程private Thread scannerThread null;public ConsumerManager(VirtualHost parent) {this.parent parent;scannerThread new Thread(()-{while (true){try {//1.拿到令牌String queueName tokenQueue.take();//2.根据令牌 找到队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if(queue null){throw new MqException([ConsumerManager] 取令牌后发现 该队列名不存在 queueName queueName);}//3.从这个队列中消费一个消息synchronized (queue){consumerMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});//设置为后台线程scannerThread.setDaemon(true);scannerThread.start();}public void notifyConsumer(String queueName) throws InterruptedException {tokenQueue.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {//找到对应的队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if(queue null){throw new MqException([ConsumerManager] 队列不存在 queueName queueName);}ConsumerEnv consumerEnv new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumerEnv(consumerEnv);//如果此时队列中已经有一些消息了 就需要立即消费int n parent.getMemoryDataCenter().getMessageCount(queueName);for (int i 0; i n; i) {//这个方法调用一次就消费一条消息consumerMessage(queue);}}}private void consumerMessage(MSGQueue queue) {//消费消息//1.按照轮询的方式 找到消费者ConsumerEnv luckyDog queue.chooseConsumerEnv();if(luckyDog null){//当前对象没有消费者 暂时不需要 等有消费者再说return;}//从队列中取出一条消息Message message parent.getMemoryDataCenter().pollMessage(queue.getName());if(message null){//当队列中还没有消息,也不需要消费return;}/*** 为了达成消息不丢失这样的效果* 1. 在真正执行回调之前 先把这个消息放在待确认集合中 避免因为回调失败 导致的消息丢失* 2. 真正执行回调* 3. 如消费者采用的是autoAck true 默认回调函数执行结束之后不抛出异常 就算消费成功 然后就可以删除消息* 硬盘 内存消息中心的哈希表 上面的待确认消息集合* 4. 当前消费者采取的是autoAck false 手动应答 需要消费者自己在自己的回调函数内部 调用basicAck这个API**///把消息带入到消费者的回调方法中 去给线程池执行parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);//真正执行回调workPool.submit(()-{try{luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(), message.getBody());log.info(消息成功消费 queueName queue.getName());//如果是自动应答 就可以直接把消息删除了if(luckyDog.isAutoAck()){//删硬盘if(message.getDeliverMode() 2){parent.getDiskDataCenter().deleteMessage(queue,message);}//删待确认集合parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),message.getMessageId());//删除内存中的消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());log.info(消息成功消费 queueName :{} ,queue.getName());}//如果是手动应答 先不处理 等消费者调用basicAck处理}catch (Exception e){e.printStackTrace();}});} } parent ⽤来记录虚拟主机.使⽤⼀个阻塞队列⽤来触发消息消费.称为令牌队列.每次有消息过来了,都往队列中放⼀个令牌(也就是队列名),然后消费者再去消费对应队列的消息.使⽤⼀个线程池⽤来执⾏消息回调 这样令牌队列的设定避免搞出来太多线程.否则就需要给每个队列都安排⼀个单独的线程了,如果队列很多则开销就⽐较⼤了 消息确认 //只有在手动应答时 才调用 应答成功 删除这条消息public boolean basicAck(String queueName, String messageId){queueName virtualHostName queueName;try {//获取到消息和队列Message message memoryDataCenter.getMessage(messageId);if (message null){throw new MqException([VirtualHost] 要确认的消息不存在 messageId messageId);}MSGQueue queue memoryDataCenter.getQueue(queueName);if(queue null){throw new MqException([virtualHost] 要确认的队列不存在 queueName queueName);}//删除硬盘上的数据if(message.getDeliverMode() 2){diskDataCenter.deleteMessage(queue,message);}//删除消息中心的数据memoryDataCenter.removeMessage(messageId);//删除待确认集合中的消息memoryDataCenter.removeMessageWaitAck(queueName,messageId);log.info(basicAck成功 消息被成功确认 queueName:{},messageId:{},queueName,messageId);return true;} catch (MqException | IOException | ClassNotFoundException e) {log.info(basicAck失败 消息确认失败 queueName:{},messageId:{},queueName,messageId);return false;}}网络通信协议设计 ⽣产者和消费者都是客⼾端,都需要通过⽹络和BrokerServer进⾏通信. 此处我们使⽤TCP协议,来作为通信的底层协议.同时在这个基础上⾃定义应⽤层协议,完成客⼾端对服务器这边功能的远程调⽤ 要调⽤的功能有: 创建channel关闭channel创建exchange删除exchange创建queue删除queue创建binding删除binding发送message订阅message发送ack返回message(服务器-客⼾端) 设计应用层协议 因为Message的消息体本⾝就是⼆进制的.因此不太⽅便使⽤json等⽂本格式的协议 其中type表⽰请求响应不同的功能.取值如下: 0x1 创建channel0x2 关闭channel0x3 创建exchange0x4 销毁exchange0x5 创建queue0x6 销毁queue0x7 创建binding0x8 销毁binding0x9 发送message0xa 订阅message0xb 返回ack0xc 服务器给客⼾端推送的消息.(被订阅的消息)响应独有的. 其中payload部分,会根据不同的type,存在不同的格式. 对于请求来说,payload表⽰这次⽅法调⽤的各种参数信息. 对于响应来说,payload表⽰这次⽅法调⽤的返回值 定义Request/Response 创建common.Request common.Response Data //表示一个网络通信中的请求对象 按照自定义协议的格式来展开的 public class Request {private int type;private int length;private byte[] payload; } //这个对象表示一个响应 Data public class Response {private int type;private int length;private byte[] payload; } 定义参数父类 构造⼀个类表⽰⽅法的参数,作为Request的payload. 不同的⽅法中,参数形态各异,但是有些信息是通⽤的,使⽤⼀个⽗类表⽰出来.具体每个⽅法的参数再通过继承的⽅式体现 common.BasicArguments Data //使用这个类表示方法的公共参数 //后续每个方法都会有一些不同的参数 不同的参数再分别使用不同的子类来表示 public class BasicArguments implements Serializable {//表示一次请求/响应的身份标识 可以把请求和响应对应protected String rid;//这次通信使用的channel的身份标识protected String channelId; } 定义返回值父类 和参数同理,也需要构造⼀个类表⽰返回值,作为Response的payload common.BasicReturns Data //表示返回值的公共信息 public class BasicReturns implements Serializable {//表示一次请求/响应的身份标识 可以把请求和响应对应protected String rid;//这次通信使用的channel的身份标识protected String channelId;//表示方法的返回值protected boolean ok; } 定义其他参数类 针对每个VirtualHost提供的⽅法,都需要有⼀个类表⽰对应的参数 ExchangeDeclareArguments Data public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private MapString, Object arguments; } ⼀个创建交换机的请求,形如: 可以把ExchangeDeclareArguments转成byte[],就得到了下列图⽚的结构.按照length⻓度读取出payload,就可以把读到的⼆进制数据转换成ExchangeDeclareArguments 对象 ExchangeDeleteArguments Data public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName; } QueueDeclareArguments Data public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean exclusive;private boolean autoDelete;private MapString,Object arguments;} QueueDeleteArguments Data public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;} QueueBindArguments Data public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey; } QueueUnbindArguments Data public class QueueUnbindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;} BasicPublishArguments Data public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body; }BasicConsumeArguments Data public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;//这个类对应的方法中 还有一个参数 是回调函数 这个回调函数是不能网络传输的} SubScribeReturns 这个不是参数,是返回值.是服务器给消费者推送的订阅消息.consumerTag其实是channelId.basicProperties 和 body 共同构成了Message Data public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body; }实现BrokerServer /*** 本质上就是一个TCP服务器*/ Slf4j public class BrokerServer {private ServerSocket serverSocket null;//当前考虑一个BrokerServer上只有一个虚拟主机private VirtualHost virtualHost new VirtualHost(default);//使用这个哈希表 表示当前所有的会话//此处的key是channelId value为对应的socket对象private ConcurrentHashMapString, Socket sessions new ConcurrentHashMap();private ExecutorService executorService null;//引入一个boolean变量 控制服务器是否继续运行private volatile boolean runnable true;public BrokerServer(int port) throws IOException {serverSocket new ServerSocket(port);}public void start() throws IOException {log.info(brokerServer 启动!);executorService Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket serverSocket.accept();//把处理连接的逻辑丢给这个线程池executorService.submit(() - {processConnection(clientSocket);});}}catch (SocketException e){log.info(服务器停止运行);}}//停止服务器public void stop() throws IOException {runnable false;//把线程池中的任务都放弃了 让线程都销毁executorService.shutdown();serverSocket.close();}//通过这个方法 来处理一个客户端的连接//在这一个方法中 可能会涉及到多个请求和响应private void processConnection(Socket clientSocket) {try (InputStream inputStream clientSocket.getInputStream();OutputStream outputStream clientSocket.getOutputStream()) {//需要按照特定格式解析 使用DataInputStream和DataOutputStreamtry (DataInputStream dataInputStream new DataInputStream(inputStream);DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {while (true) {//1.读取请求并解析Request request readRequest(dataInputStream);//2.根据请求计算响应Response response process(request, clientSocket);//3.把响应写回客户端writeResponse(dataOutputStream, response);}} catch (EOFException e) {//对于这个代码 如果DataInputStream 如果读到EOF 就会抛出一个eofException异常//需要借助这个异常来结束循环log.info(connection关闭! 客户端的地址: {} : {}, clientSocket.getInetAddress().toString(), clientSocket.getPort());}} catch (ClassNotFoundException | MqException | IOException e) {log.info(connection 出现异常);e.printStackTrace();} finally {try {clientSocket.close();//一个tcp连接中 可能包含多个channel 需要把这个socket对应的所有channel也顺便清理掉clearCloseSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}//遍历上述sessions哈希表 把被关闭的socket对应的键值对删除private void clearCloseSession(Socket clientSocket) {ListString toDeleteChannelId new ArrayList();for(Map.EntryString,Socket entry : sessions.entrySet()){if(entry.getValue() clientSocket){//不能一边遍历一边删除 此时影响结构影响遍历toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId){sessions.remove(channelId);}log.info(清理session完成 被清理的sessionId :{},toDeleteChannelId);}//处理一次请求 返回一次响应private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {//先对request中的payload做一个初步的解析BasicArguments basicArguments (BasicArguments) BinaryTool.fromBytes(request.getPayload());log.info(rid:{} , channelId:{} , type:{}, length:{}, basicArguments.getRid(), basicArguments.getChannelId(),request.getType(), request.getLength());//根据type的值 来进一步区分接下来这次请求要干什么boolean ok true;if (request.getType() 0x1) {//创建channelsessions.put(basicArguments.getChannelId(), clientSocket);log.info(创建channel完成 channelId:{}, basicArguments.getChannelId());} else if (request.getType() 0x2) {//销毁channelsessions.remove(basicArguments.getChannelId());log.info(销毁channel完成 channelId:{}, basicArguments.getChannelId());} else if (request.getType() 0x3) {//创建交换机 此时payload是ExchangeDeclareArguments 对象了ExchangeDeclareArguments arguments (ExchangeDeclareArguments) basicArguments;ok virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() 0x4) {//删除交换机ExchangeDeleteArguments arguments (ExchangeDeleteArguments) basicArguments;ok virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() 0x5) {//创建队列QueueDeclareArguments arguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() 0x6) {//删除队列QueueDeclareArguments arguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDelete(arguments.getQueueName());} else if (request.getType() 0x7) {//创建绑定QueueBindArguments arguments (QueueBindArguments) basicArguments;ok virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(),arguments.getBindingKey());} else if (request.getType() 0x8) {//删除绑定QueueBindArguments arguments (QueueBindArguments) basicArguments;ok virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() 0x9) {//发布消息BasicPublishArguments arguments (BasicPublishArguments) basicArguments;ok virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() 0xa) {//订阅消息BasicConsumeArguments arguments (BasicConsumeArguments) basicArguments;ok virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {//这个回调函数要做的工作 就是把服务器收到的消息可以直接推送到对应的消费者客户端Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//先知道当前收到的消息要发给那个客户端//此处consumerTag 其实是channelId 根据channelId去sessions中查询 就可以得到相应的socket\对象 就可以发送数据//1.根据channelId找到 socket对象Socket clientSocket sessions.get(consumerTag);if (clientSocket null || clientSocket.isClosed()) {throw new MqException([BrokeServer] 订阅消息的客户端已经关闭);}//构造响应数据SubScribeReturns subScribeReturns new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid();//这里只有响应 没有请求 不需要ridsubScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload BinaryTool.toBytes(subScribeReturns);Response response new Response();//0xc表示服务器给消费者客户端推送的消息数据response.setType(0xc);response.setLength(payload.length);response.setPayload(payload);//把数据写回 给客户端DataOutputStream dataOutputStream new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() 0xb) {//调用basicAck确认消息BasicAckArguments arguments (BasicAckArguments) basicArguments;ok virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {//当前type是非法的throw new MqException([BrokerServer] 未知的 type type: request.getType());}//构造响应BasicReturns basicReturns new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload BinaryTool.toBytes(basicReturns);Response response new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info(rid:{} , channelId:{}, type:{}, length:{}, basicReturns.getRid(), basicReturns.getChannelId(),response.getType(), response.getLength());return response;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());//刷新缓冲区dataOutputStream.flush();}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload new byte[request.getLength()];int n dataInputStream.read(payload);if (n ! request.getLength()) {throw new IOException(读取请求格式出错);}request.setPayload(payload);return request;} } virtualHost 表⽰服务器持有的虚拟主机.队列,交换机,绑定,消息都是通过虚拟主机管理.sessions ⽤来管理所有的客⼾端的连接.记录每个客⼾端的socket.serverSocket 是服务器⾃⾝的socketexecutorService 这个线程池⽤来处理响应.runnable 这个标志位⽤来控制服务器的运⾏停⽌ 启动/停止服务器 public void start() throws IOException {log.info(brokerServer 启动!);executorService Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket serverSocket.accept();//把处理连接的逻辑丢给这个线程池executorService.submit(() - {processConnection(clientSocket);});}}catch (SocketException e){log.info(服务器停止运行);}}//停止服务器public void stop() throws IOException {runnable false;//把线程池中的任务都放弃了 让线程都销毁executorService.shutdown();serverSocket.close();}实现处理连接 对于EOFException和SocketException,我们视为客⼾端正常断开连接. ◦ 如果是客⼾端先close,后调⽤DataInputStream的read,则抛出EOFException ◦ 如果是先调⽤DataInputStream的read,后客⼾端调⽤close,则抛出SocketException //通过这个方法 来处理一个客户端的连接//在这一个方法中 可能会涉及到多个请求和响应private void processConnection(Socket clientSocket) {try (InputStream inputStream clientSocket.getInputStream();OutputStream outputStream clientSocket.getOutputStream()) {//需要按照特定格式解析 使用DataInputStream和DataOutputStreamtry (DataInputStream dataInputStream new DataInputStream(inputStream);DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {while (true) {//1.读取请求并解析Request request readRequest(dataInputStream);//2.根据请求计算响应Response response process(request, clientSocket);//3.把响应写回客户端writeResponse(dataOutputStream, response);}} catch (EOFException e) {//对于这个代码 如果DataInputStream 如果读到EOF 就会抛出一个eofException异常//需要借助这个异常来结束循环log.info(connection关闭! 客户端的地址: {} : {}, clientSocket.getInetAddress().toString(), clientSocket.getPort());}} catch (ClassNotFoundException | MqException | IOException e) {log.info(connection 出现异常);e.printStackTrace();} finally {try {clientSocket.close();//一个tcp连接中 可能包含多个channel 需要把这个socket对应的所有channel也顺便清理掉clearCloseSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}实现readRequest private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload new byte[request.getLength()];int n dataInputStream.read(payload);if (n ! request.getLength()) {throw new IOException(读取请求格式出错);}request.setPayload(payload);return request;}实现writeResponse private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());//刷新缓冲区dataOutputStream.flush();}实现处理请求 先把请求转换成BaseArguments,获取到其中的channelId和rid再根据不同的type,分别处理不同的逻辑.(主要是调⽤virtualHost中不同的⽅法).针对消息订阅操作,则需要在存在消息的时候通过回调,把响应结果写回给对应的客⼾端.最后构造成统⼀的响应 private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {//先对request中的payload做一个初步的解析BasicArguments basicArguments (BasicArguments) BinaryTool.fromBytes(request.getPayload());log.info(rid:{} , channelId:{} , type:{}, length:{}, basicArguments.getRid(), basicArguments.getChannelId(),request.getType(), request.getLength());//根据type的值 来进一步区分接下来这次请求要干什么boolean ok true;if (request.getType() 0x1) {//创建channelsessions.put(basicArguments.getChannelId(), clientSocket);log.info(创建channel完成 channelId:{}, basicArguments.getChannelId());} else if (request.getType() 0x2) {//销毁channelsessions.remove(basicArguments.getChannelId());log.info(销毁channel完成 channelId:{}, basicArguments.getChannelId());} else if (request.getType() 0x3) {//创建交换机 此时payload是ExchangeDeclareArguments 对象了ExchangeDeclareArguments arguments (ExchangeDeclareArguments) basicArguments;ok virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() 0x4) {//删除交换机ExchangeDeleteArguments arguments (ExchangeDeleteArguments) basicArguments;ok virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() 0x5) {//创建队列QueueDeclareArguments arguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() 0x6) {//删除队列QueueDeclareArguments arguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDelete(arguments.getQueueName());} else if (request.getType() 0x7) {//创建绑定QueueBindArguments arguments (QueueBindArguments) basicArguments;ok virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(),arguments.getBindingKey());} else if (request.getType() 0x8) {//删除绑定QueueBindArguments arguments (QueueBindArguments) basicArguments;ok virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() 0x9) {//发布消息BasicPublishArguments arguments (BasicPublishArguments) basicArguments;ok virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() 0xa) {//订阅消息BasicConsumeArguments arguments (BasicConsumeArguments) basicArguments;ok virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {//这个回调函数要做的工作 就是把服务器收到的消息可以直接推送到对应的消费者客户端Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//先知道当前收到的消息要发给那个客户端//此处consumerTag 其实是channelId 根据channelId去sessions中查询 就可以得到相应的socket\对象 就可以发送数据//1.根据channelId找到 socket对象Socket clientSocket sessions.get(consumerTag);if (clientSocket null || clientSocket.isClosed()) {throw new MqException([BrokeServer] 订阅消息的客户端已经关闭);}//构造响应数据SubScribeReturns subScribeReturns new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid();//这里只有响应 没有请求 不需要ridsubScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload BinaryTool.toBytes(subScribeReturns);Response response new Response();//0xc表示服务器给消费者客户端推送的消息数据response.setType(0xc);response.setLength(payload.length);response.setPayload(payload);//把数据写回 给客户端DataOutputStream dataOutputStream new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() 0xb) {//调用basicAck确认消息BasicAckArguments arguments (BasicAckArguments) basicArguments;ok virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {//当前type是非法的throw new MqException([BrokerServer] 未知的 type type: request.getType());}//构造响应BasicReturns basicReturns new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload BinaryTool.toBytes(basicReturns);Response response new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info(rid:{} , channelId:{}, type:{}, length:{}, basicReturns.getRid(), basicReturns.getChannelId(),response.getType(), response.getLength());return response;}实现clearClosedSessio 如果客⼾端只关闭了Connection,没关闭Connection中包含的Channel,也没关系,在这⾥统⼀进⾏清理.注意迭代器失效问题 //遍历上述sessions哈希表 把被关闭的socket对应的键值对删除private void clearCloseSession(Socket clientSocket) {ListString toDeleteChannelId new ArrayList();for(Map.EntryString,Socket entry : sessions.entrySet()){if(entry.getValue() clientSocket){//不能一边遍历一边删除 此时影响结构影响遍历toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId){sessions.remove(channelId);}log.info(清理session完成 被清理的sessionId :{},toDeleteChannelId);}实现客户端 创建包mqclient 创建 ConnectionFactory ⽤来创建连接的⼯⼚类. Data public class ConnectionFactory {//broker server的ip地址private String host;//broker server的端口号private int port;public Connection newConnection() throws IOException {Connection connection new Connection(host,port);return connection;} }Connection 和Channel的定义 ⼀个客⼾端可以创建多个Connection. ⼀个Connection对应⼀个socket,⼀个TCP连接. ⼀个Connection可以包含多个Channel Connection的定义 public class Connection {private Socket socket null;//使用哈希表 把若干个channel对象组织起来private ConcurrentHashMapString, Channel channelMap new ConcurrentHashMap();private InputStream inputStream;private OutputStream outputStream;private DataOutputStream dataOutputStream;private DataInputStream dataInputStream;private ExecutorService callbackPool null; }封装请求响应读写操作 public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();log.info(发送请求 type:{} ,length:{}, request.getType(), request.getLength());}//读取响应public Response readResponse() throws IOException {Response response new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload new byte[response.getLength()];int n dataInputStream.read(payload);if (n ! response.getLength()) {throw new IOException(读取的响应数据不完整);}response.setPayload(payload);log.info(收到响应 type:{}, length:{}, response.getType(), response.getLength());return response;}创建channel //创建Channelpublic Channel createChannel() throws IOException {String channelId C- UUID.randomUUID();Channel channel new Channel(channelId, this);//把这个 Channel对象 放到Connection 管理channel 的哈希表中channelMap.put(channelId, channel);//同时也需要把创建channel这个消息告诉服务器boolean ok channel.createChannel();if (!ok) {//整个这次创建channel操作不顺利//把哈希表中的键值对删除channelMap.remove(channelId);return null;}return channel;}Channel的定义 public class Channel {private String channelId;//当前channel属于哪个连接private Connection connection;//记录后续客户端收到的服务器的响应private ConcurrentHashMapString, BasicReturns basicReturnsMap new ConcurrentHashMap();//如果当前Channel订阅了某个队列 就需要在此处记录下对应回调是啥,当该队列的消息返回来的时候,调用回调//此处约定一个Channel中只能有一个回调private Consumer consumer null;public Channel(String channelId, Connection connection) {this.channelId channelId;this.connection connection;} }channelId 为channel的⾝份标识,使⽤UUID标识.Connection 为channel对应的连接.baseReturnsMap ⽤来保存响应的返回值.放到这个哈希表中⽅便和请求匹配.consumer为消费者的回调(⽤⼾注册的).对于消息响应,应该调⽤这个回调处理消息. 创建channel //在这个方法中 和服务器进行交互 告诉服务器 此时客户端创建了新的channelpublic boolean createChannel() throws IOException {//对于创建Channel操作来说 payload就是一个basicArguments对象BasicArguments arguments new BasicArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());byte[] payload BinaryTool.toBytes(arguments);//构造type 0x1的对象Request request new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//等待服务器的响应BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();}实现generateRid private String generateRid() {return R- UUID.randomUUID();}实现waitResult 由于服务器的响应是异步的.此处通过waitResult实现同步等待的效果 private BasicReturns waitResult(String rid) {BasicReturns basicReturns null;while ((basicReturns basicReturnsMap.get(rid)) null) {//如果查询结果为null 说明包裹还没有回来//此时就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}//读取成功后 把消息从哈希表中删除掉basicReturnsMap.remove(rid);return basicReturns;}关闭channel //关闭channel 发送type 0x2public boolean close() throws IOException {BasicArguments basicArguments new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload BinaryTool.toBytes(basicArguments);Request request new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(basicArguments.getRid());return basicReturns.isOk();}创建交换机 //创建交换机public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,MapString, Object arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload BinaryTool.toBytes(exchangeDeclareArguments);Request request new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}删除交换机 //删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments exchangeDeleteArguments new ExchangeDeleteArguments();exchangeDeleteArguments.setRid(generateRid());exchangeDeleteArguments.setChannelId(channelId);exchangeDeleteArguments.setExchangeName(exchangeName);byte[] payload BinaryTool.toBytes(exchangeDeleteArguments);Request request new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(exchangeDeleteArguments.getRid());return basicReturns.isOk();}创建队列 //创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,MapString, Object arguments) throws IOException {QueueDeclareArguments queueDeclareArguments new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);byte[] payload BinaryTool.toBytes(queueDeclareArguments);Request request new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}#### 删除队列java //删除队列public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments arguments new QueueDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();}创建绑定 //创建绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments arguments new QueueBindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);arguments.setBindingKey(bindingKey);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();}删除绑定 //解除绑定public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnbindArguments arguments new QueueUnbindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();}发送消息 //发送消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setBasicProperties(basicProperties);arguments.setRoutingKey(routingKey);arguments.setBody(body);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();}订阅消息 //订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {//先设置回调if (this.consumer ! null) {throw new MqException(该channel已经设置过消费信息的回调了 不能重复设置);}this.consumer consumer;BasicConsumeArguments arguments new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setConsumerTag(channelId);//consumerTag 也是用channelId来表示了arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();}确认消息 //确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();}处理响应 创建扫描线程 创建⼀个扫描线程,⽤来不停的读取socket中的响应数据 注意:⼀个Connection中可能包含多个channel,需要把响应分别放到对应的channel中 public Connection(String host, int port) throws IOException {socket new Socket(host, port);inputStream socket.getInputStream();outputStream socket.getOutputStream();dataInputStream new DataInputStream(inputStream);dataOutputStream new DataOutputStream(outputStream);callbackPool Executors.newFixedThreadPool(4);//创建一个扫描线程 由这个线程负责不停的对socket中读取响应数据 把这个响应数据再交给对应的channel负责处理Thread t new Thread(() - {try {while (!socket.isClosed()) {Response response readResponse();dispatchResponse(response);}} catch (SocketException e) {//连接正常断开 忽略这个异常log.info(连接正常断开);} catch (IOException | ClassNotFoundException | MqException e) {log.info(连接异常断开);e.printStackTrace();}});t.start();}实现响应的分发 给Connection创建dispatchResponse⽅法 //使用这个方法 分别来处理//这个消息是 针对控制请求的响应 还是服务器推送的消息private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() 0xc) {//服务器推送来的消息数据SubScribeReturns subScribeReturns (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());//根据channelId 找到对应的channel对象Channel channel channelMap.get(subScribeReturns.getChannelId());if (channel null) {throw new MqException([Connect] 该消息对应的channel 在客户端中不存在 channelId channel.getChannelId());}//执行该channel对象内部的回调callbackPool.submit(() - {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {//当前响应是针对刚才的控制请求的响应BasicReturns basicReturns (BasicReturns) BinaryTool.fromBytes(response.getPayload());//把这个结果放在对应的channel的hash表中Channel channel channelMap.get(basicReturns.getChannelId());if (channel null) {throw new MqException([Connection] 该队列对应的channel在客户端中不存在);}channel.putReturns(basicReturns);}}public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(),basicReturns);synchronized (this){//当前也不知道多少线程在等待上述的这个响应//把所有的等待线程都唤醒notifyAll();}}关闭Connection //关闭channel 发送type 0x2public boolean close() throws IOException {BasicArguments basicArguments new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload BinaryTool.toBytes(basicArguments);Request request new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(basicArguments.getRid());return basicReturns.isOk();}
http://www.zqtcl.cn/news/654377/

相关文章:

  • 招聘网站可以同时做两份简历吗外贸网站示例
  • 黑链 对网站的影响企业融资计划书范本
  • 自己的简历怎么制作网站学院网站建设成效
  • 周口seo 网站郑州建站网站的公司
  • 网站布局模板北京装修大概多少钱一平方
  • 德阳网站建设ghxhwl风景网站模板
  • 昌邑网站建设拓者设计吧现代效果图
  • 学校网站建设成功案例网站开发需要学习哪些内容
  • 怎么让公司建设网站seo于刷网站点击
  • 网站建设合同严瑾建设网站宣传
  • 哪个网站做餐饮推广最好深圳市信任网站
  • 网站模板 整站源码广州网站vi设计报价
  • 百度速页建站wordpress审核插件
  • 怎么给网站wordpress专业的vi设计公司
  • 百度关键词在线优化寻找郑州网站优化公司
  • 网站建设适合什么单位网络推广员工作内容
  • 漂亮的网站维护页面wordpress加个微信登录
  • 网站设计是什么意思创建地址怎么弄
  • nas上建设网站文章网站哪里建设好
  • 消防网站模板广告设计专业需要学什么
  • 建设银行网站首页wordpress 登录函数
  • 做网站多长时间广州营销网站制作
  • 美团外卖网站开发建设网站如何写文案
  • 专门做画册封面的网站开发工程师网站开发工程师招聘
  • 广州市建设局网站自己做电影网站违法
  • 网站建设首选公司大丰专业做网站
  • 用dw怎么做网站辽宁省住房和城乡建设厅网站首页
  • 如何用微信小程序做网站2个网站做的链接怎么用一个域名
  • 大理网站建设滇icp备凡科网站代码如何修改
  • 做电商网站的公司简介网站制作多久