网站微信分享怎么做,设计软件培训学校,金峰辉网站建设,闵行网页设计目录
引言
约定应用层的通信协议
自定义应用层协议
Type
Length
PayLod 实现 Broker Server 类
属性 与 构造
启动 Broker Server
停止 Broker Server
处理客户端连接
读取请求 与 写回响应
根据请求计算响应
清除 channel 引言 生产者 和 消费者 都是客户端均通过 网络 和 Broker Server 进行通信 注意点一 此处我们将使用 TCP 协议来作为通信的底层协议 注意点二 TCP 是有连接的Connection由于 创建/断开 TCP 连接的成本还挺高需要三次握手啥的所以为了能够让 TCP 连接得到复用我们还将创建一个 Channel 类作为 Connection 内部的 逻辑上 的连接即一个 Connection 中可能有多个 Channel一个管道多个网线传输的效果 约定应用层的通信协议 此处要交互的 Message 为 二进制数据HTTP 为文本协议JSON 为文本格式不适用此处场景所以我们自定义一个应用层协议使用二进制的方式来传输数据 自定义应用层协议 Type type 描述当前这个请求或响应是干啥的 具体理解 在我们的 MQ 中客户端生产者 消费者和 服务器Broker Server之间要进行的操作就是 VirtualHost 中的那些核心 API我们希望客户端通过网络能够远程调用 VirtualHost 中的核心 API此处 type 就是在描述当前这个请求/响应是在调用哪个 API取值如下0x1 创建 channel0x2 关闭 channel0x3 创建 exchange0x4 销毁 exchange0x5 创建 queue0x6 销毁 queue0x7 创建 binding0x8 销毁 binding0x9 发送 message0xa 订阅 message0xb 返回 ack0xc 服务器给客户端推送消息被订阅的消息响应独有的 Length length 用来描述 payload 长度防止粘包问题 PayLod payload 会根据当前是请求还是响应以及当前的 type 有不同的取值 实例理解 实例一 比如 type 是 0x3创建交换机同时当前是一个请求此时 payload 里的内容就相当于是 exchangeDeclare 的参数序列化的结果 具体代码实现 按照上述自定义应用层协议 创建 Request 类 import lombok.Data;/*
* 表示一个网络通信中的请求对象按照自定义协议的格式来展开的
* */
Data
public class Request {private int type;private int length;private byte[] payload;
} 按照上述自定义应用层协议 创建 BasicArguments 类用于表示各方法的公共参数 import lombok.Data;import java.io.Serializable;/*
* 使用这个类表示方法的公共参数/辅助的字段
* 后续使用每个方法又会有一些不同的参数不同的参数再分别使用不同的子类来表示
* */
Data
public class BasicArguments implements Serializable {
// 表示一次请求/响应 的身份标识可以把请求和响应对上protected String rid;
// 这个通信使用的 channel 的身份标识protected String channelId;
} 每个方法有不同的参数此处实例 type 0x3 即 创建交换机exchangeDeclare所以我们根据 VirtualHost 中的 exchangeDeclare 方法中的参数单独创建一个类出来该类还需 继承用于表示公共参数的 BasicArguments 类 import com.example.demo.mqserver.core.ExchangeType;
import lombok.Getter;
import lombok.Setter;import java.io.Serializable;
import java.util.Map;Getter
Setter
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private MapString,Object arguments;
} 注意 其他 type 类型除 0x1、0x2 、0xa 外也均根据 其在 VirtualHost 中对应的参数单独创建一个类即可0x1 和 0x2 分别为 创建 channel 和 关闭 channel二者 API 所需参数就是公共参数使用 BasicArguments 类即可无需单独创建类type 0xa即 订阅消息basicConsume后文详细讲解 实例二 比如 type 0x3创建交换机同时当前是一个响应此时 payload 里的内容就是 exchangeDeclare 的返回结果的序列化内容 具体代码实现 按照上述自定义应用层协议 创建 Response 类 import lombok.Data;/*
* 这个对象表示一个响应也是根据自定义应用层协议来的
* */
Data
public class Response {private int type;private int length;private byte[] payload;
}按照上述自定义应用层协议 创建 BasicReturns 类用于表示远程调用方法的返回值 import lombok.Data;import java.io.Serializable;/*
* 这个类表示各个远程调用的方法的返回值和公共信息
* */
Data
public class BasicReturns implements Serializable {
// 用来标识唯一的请求和响应protected String rid;
// 用来标识一个 channelIdprotected String channelId;
// 表示当前这个远程调用方法的返回值protected boolean ok;
} 注意 其他 type 类型除 0xc 外均使用 BasicReturns 类中的成员变量 作为返回参数type 0xc该 type 类型为响应独占表示 服务器给客户端推送消息被订阅的消息后文详解讲解 特例一 比如 type 0xa订阅消息同时当前是一个请求这个核心 API 比较特殊其参数中包含有 回调函数 具体代码编写 我们根据 VirtualHost 中的 BasicConsume 方法中的参数单独创建一个类出来并且该类也要 继承用于表示公共参数的 BasicArguments 类唯一不同的是其中用于表示 回调函数的参数 consumer 我们不写入该类中也就代表着在客户端发送请求时不再携带 consumer 参数因为在 broker server 这边我们规定 BasicConsume 的回调方法统一为 将收到的消息返回给消费者消费者仅需收到消息后再在客户端自己这边执行一个用户自定义的回调就行了 import lombok.Getter;
import lombok.Setter;import java.io.Serializable;Getter
Setter
public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;
// 这个类对应的 basicConsume 方法中还有一个参数是回调函数(如何来有效处理消息)
// 这个回调函数是不能通过网络传输的
// 站在 broker server 这边针对消息的处理问题其实是统一的(把消息返回给客户端)
// 客户端这边收到消息之后再在客户端自己这边执行一个用户自定义的回调就行了
// 此时客户端就不需要把自身的回调告诉服务器了
// 这个类就不需要 consumer 成员了
} 特列二 type 0xc即 服务器给客户端推送消息被订阅的消息该类型一定是一个响应 如上图所示的蓝色部分此处我们定义一个 SubScribeReturns 类用于表示在消费者订阅队列之后服务器给消费推送消息的响应参数此处仍需继承一下 代表响应公共参数的 BasicReturns 类 import com.example.demo.mqserver.core.BasicProperties;
import lombok.Getter;
import lombok.Setter;import java.io.Serializable;Getter
Setter
public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;
} 注意 SubScribeReturns 类虽然继承了 BasicReturns 类但是在返回时无需填写 BasicReturns 类中的成员变量 rid因为该响应无相对应的请求故该响应无 rid即将 rid 设为空字符串即可 小结 上述内容属于服务器程序的关键环节自定义应用层协议 实现 Broker Server 类 属性 与 构造 /*
* 这个 BrokerServer 就是咱们 消息队列 本体服务器
* 本质上就是一个 TCP 的服务器
* */
public class BrokerServer {private ServerSocket serverSocket null;// 当前考虑一个 BrokerServer 上只有一个 虚拟主机private VirtualHost virtualHost new VirtualHost(default);
// 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)
// 此处的 key 是 channelIdvalue 为对应的 Socket 对象private ConcurrentHashMapString, Socket sessions new ConcurrentHashMapString,Socket();
// 引入线程池来处理多个客户端的请求private ExecutorService executorService null;
// 引入一个 Boolean 变量控制服务器是否继续运行private volatile boolean runnable true;public BrokerServer(int port) throws IOException {serverSocket new ServerSocket(port);}
} 启动 Broker Server public void start() throws IOException {System.out.println([BrokerServer] 启动);executorService Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket serverSocket.accept();
// 把处理连接的逻辑丢给这个线程池executorService.submit(() -{processConnection(clientSocket);});}}catch (SocketException e){System.out.println([BrokerServer] 服务器停止运行);}} 停止 Broker Server // 一般来说停止服务器就是直接 kill 掉对应进程就行了
// 此处还是搞一个单独的停止方法主要是用于后续的单元测试public void stop() throws IOException {runnable false;
// 把线程池中的任务都放弃了让线程都销毁executorService.shutdownNow();serverSocket.close();} 处理客户端连接 // 通过这个方法来处理一个客户端的连接
// 在这一个连接中可能会涉及到多个请求和响应private void processConnection(Socket clientSocket){try (InputStream inputStream clientSocket.getInputStream();OutputStream outputStream clientSocket.getOutputStream()){
// 这里需要按照特定格式来读取并解析此时就需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream new DataInputStream(inputStream);DataOutputStream dataOutputStream new DataOutputStream(outputStream)){while (true) {
// 1、读取请求并解析Request request readRequest(dataInputStream);
// 2、根据请求计算响应Response response process(request,clientSocket);
// 3、把响应写回给客户端writeResponse(dataOutputStream,response);}}} catch (EOFException | SocketException e) {
// 对于这个代码DataInputStream 如果读到 EOF就会抛出一个 EOFException 异常
// 需要借助这个异常来结束循环System.out.println([BrokerServer] connection 关闭客户端的地址 clientSocket.getInetAddress().toString() : clientSocket.getPort());} catch (IOException | ClassNotFoundException | MqException e) {System.out.println([BrokerServer] connection 出现异常);e.printStackTrace();}finally {try {
// 当连接处理完了就需要记得关闭 socketclientSocket.close();
// 一个 TCP 连接中可能包含多个 channel 需要把当前这个 socket 对应的所有 channel 也顺便清理掉clearClosedSession(clientSocket);}catch (IOException e) {e.printStackTrace();}}} 读取请求 与 写回响应 private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload new byte[request.getLength()];int n dataInputStream.read(payload);if(n ! request.getLength()) {throw new IOException(读取请求格式出错);}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());
// 这个刷新缓冲区也是重要的操作保证当前写的这些数据能够快速进入到网卡里而不至于在内存中呆着dataOutputStream.flush();} 根据请求计算响应 根据不同的 type 类型来远程调用 VirtualHost 中不同的核心 API 具体代码编写 private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
// 1、把 request 中的 payload 做一个初步的解析BasicArguments basicArguments (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println([Request] rid basicArguments.getRid() , channelId basicArguments.getChannelId() , type request.getType() , length request.getLength());
// 2、根据 type 的值来进一步区分接下来这次请求要干啥boolean ok true;if(request.getType() 0x1) {
// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println([BrokerServer] 创建 channel 完成 channelId basicArguments.getChannelId());}else if(request.getType() 0x2) {
// 销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println([BrokerServer] 销毁 channel 完成 channelId basicArguments.getChannelId());} else if(request.getType() 0x3) {
// 创建交换机此时 payload 就是 ExchangeDeclareArguments 对象了ExchangeDeclareArguments arguments (ExchangeDeclareArguments) basicArguments;ok virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getExchangeType(),arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());} else if(request.getType() 0x4) {
// 删除交换机此时 payload 就是 ExchangeDeleteArguments 对象了ExchangeDeleteArguments arguments (ExchangeDeleteArguments) basicArguments;ok virtualHost.exchangeDelete(arguments.getExchangeName());} else if(request.getType() 0x5) {
// 创建队列此时 payload 就是 QueueDeclareArguments 对象了QueueDeclareArguments arguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDeclare(arguments.getQueueName(),arguments.isDurable(),arguments.isExclusive(),arguments.isAutoDelete(),arguments.getArguments());} else if(request.getType() 0x6){
// 销毁队列此时 payload 就是 QueueDeleteArguments 对象了QueueDeleteArguments arguments (QueueDeleteArguments) basicArguments;ok virtualHost.queueDelete(arguments.getQueueName());} else if(request.getType() 0x7){
// 创建绑定此时 payload 就是 QueueBindArguments 对象了QueueBindArguments arguments (QueueBindArguments) basicArguments;ok virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if(request.getType() 0x8){
// 删除绑定此时 payload 就是 QueueUnbindArguments 对象了QueueUnbindArguments arguments (QueueUnbindArguments) basicArguments;ok virtualHost.queueUnbind(arguments.getQueueName(),arguments.getExchangeName());} else if(request.getType() 0x9){
// 发送消息此时 payload 就是 BasicPublishArguments 对象了BasicPublishArguments arguments (BasicPublishArguments) basicArguments;ok virtualHost.basicPublish(arguments.getExchangeName(),arguments.getRoutingKey(),arguments.getBasicProperties(),arguments.getBody());} else if(request.getType() 0xa){
// 订阅消息此时 payload 就是 BasicConsumeArguments 对象了BasicConsumeArguments arguments (BasicConsumeArguments) basicArguments;ok virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {
// 这个回调函数要做的工作就是把服务器收到的消息可以直接推送回对应的消费者客户端Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
// 先知道当前这个收到的消息要发给哪个客户端此处 consumerTag 其实是 channelId
// 根据 channelId 去 sessions 中查询就可以得到对应的 socket 对象了从而可以往里面发送数据了
// 1、根据 channelId 找到 socket 对象Socket clientSocket sessions.get(consumerTag);if(clientSocket null || clientSocket.isClosed()) {throw new MqException([BrokerServer] 订阅消息的客户端已经关闭);}
// 2、构造响应数据SubScribeReturns subScribeReturns new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(); // 由于这里只有响应没有请求不需要去对应 rid 暂时不需要subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBody(body);subScribeReturns.setBasicProperties(basicProperties);byte[] paylaod BinaryTool.toBytes(subScribeReturns);Response response new Response();
// 0xc 表示服务器给消费者客户端推送的消息数据response.setType(0xc);
// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(paylaod.length);response.setPayload(paylaod);
// 3、把数据写回给客户端
// 注意此处的 dataOutputStream 这个对象不能 close
// 如果把 dataOutputStream 关闭就会直接把 clientSocket 里的 outputStream 也给关了
// 此时就无法继续往 socket 中写入后续数据了DataOutputStream dataOutputStream new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream,response);}});} else if(request.getType() 0xb){
// 确认消息此时 payload 就是 BasicAckArguments 对象了BasicAckArguments arguments (BasicAckArguments) basicArguments;ok virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());}else {
// 当前的 type 是非法的throw new MqException([BrokerServer] 未知的 typetype request.getType());}
// 3、构造响应BasicReturns basicReturns new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload BinaryTool.toBytes(basicReturns);Response response new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println([Response] rid basicReturns.getRid() ,channelId basicReturns.getChannelId() , type response.getType() ,length response.getLength());return response;} 注意点一 当前请求中的 payload 里面放的内容 是根据 type 的类型来走的比如 type 是 0x3payload 就是 ExchangeDeclareArguments比如 type 是 0x4payload 就是 ExchangeDeleteArguments... 注意点二 此处设定的不同的方法的参数虽然都有不同的类但是它们均继承自同一个 BasicArguments 类因此先将 payload 转成 BasicArguments 清除 channel 清理 sessions 这个 哈希表 中的 session 信息 具体代码编写 private void clearClosedSession(Socket clientSocket) {
// 这里要做的事情主要就是遍历上述 session hash 表把该关闭的 socket 对应的键值对统统删掉ListString toDeleteChannelId new ArrayList();for(Map.EntryString,Socket entry : sessions.entrySet()) {if(entry.getValue() clientSocket) {
// 不能在这里直接删除
// 这属于集合类的一个大忌一边遍历一边删除
// session.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}System.out.println([BrokerServer] 清理 session 完成被清理的 channelId toDeleteChannelId);}