彩票网站建设 极云,自己做的网站被举报违反广告法,wordpress 腾讯视频,wordpress最新编辑器目录
一、客户端代码实现
1.1、需求分析
1.2、具体实现
1#xff09;实现 ConnectionFactory
2#xff09;实现 Connection
3#xff09;实现 Channel
二、编写 Demo
2.1、实例
2.1、实例演示 一、客户端代码实现 1.1、需求分析
RabbitMQ 的客户端设定#xff…目录
一、客户端代码实现
1.1、需求分析
1.2、具体实现
1实现 ConnectionFactory
2实现 Connection
3实现 Channel
二、编写 Demo
2.1、实例
2.1、实例演示 一、客户端代码实现 1.1、需求分析
RabbitMQ 的客户端设定一个客户端可以有多个模块每个模块都可以和 broker server 之间建立 “逻辑上的连接” channel这几个模块的channel 彼此之间是互相不影响的同时这几个 channel 又复用的同一个 TCP 连接省去了频繁 建立/销毁 TCP 连接的开销三次握手、四次挥手.......
这里我们也按照这样的逻辑实现 消息队列 的客户端主要涉及到以下三个核心类
ConnectionFactory连接工厂这个类持有服务器的地址主要功能就是创建 Connection 对象.Connection表示一个 TCP连接持有 Socket 对象用来 写入请求/读取响应管理多个Channel 对象.Channel表示一个逻辑上的连接需要提供一系列的方法去和服务器提供的核心 API 对应客户端提供的这些方法的内部就是写入了一个特定的请求然后等待服务器响应. 1.2、具体实现
1实现 ConnectionFactory
主要用来创建 Connection 对象.
public class ConnectionFactory {//broker server 的 ip 地址private String host;//broker server 的端口号private int port;// //访问 broker server 的哪个虚拟主机
// //这里暂时先不涉及
// private String virtualHostName;
// private String username;
// private String password;public Connection newConnection() throws IOException {Connection connection new Connection(host, port);return connection;}public String getHost() {return host;}public void setHost(String host) {this.host host;}public int getPort() {return port;}public void setPort(int port) {this.port port;}
}2实现 Connection
属性如下 private Socket socket;//一个 socket 连接需要管理多个 channelprivate ConcurrentHashMapString, Channel channelMap new ConcurrentHashMap();private InputStream inputStream;private OutputStream outputStream;// DataXXX 主要用来 读取/写入 特定格式数据(例如 readInt())private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;//用来处理 0xc 的回调这里开销可能会很大不希望把 Connection 阻塞住因此使用 线程池 来处理private ExecutorService callbackPool;构造如下
这里不光需要初始化属性还需要创建一个扫描线程由这个线程负责不停的从 socket 中读取响应数据把这个响应数据再交给对应的 channel 负责处理 public Connection(String host, int port) throws IOException {socket new Socket(host, port);inputStream socket.getInputStream();outputStream socket.getOutputStream();dataInputStream new DataInputStream(inputStream);dataOutputStream new DataOutputStream(outputStream);callbackPool Executors.newFixedThreadPool(4);//创建一个扫描线程由这个线程负责不停的从 socket 中读取响应数据把这个响应数据再交给对应的 channel 负责处理Thread t new Thread(() - {try {while(!socket.isClosed()) {Response response readResponse();dispatchResponse(response);}} catch (SocketException e) {//连接正常断开的此时这个异常可以忽略System.out.println([Connection] 连接正常断开);} catch(IOException | ClassNotFoundException | MqException e) {System.out.println([Connection] 连接异常断开);e.printStackTrace();}});t.start();}释放 Connection 相关资源 public void close() {try {callbackPool.shutdown();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}使用这个方法来区别当前的响应是一个针对控制请求的响应还是服务器推送过来的消息.
如果是服务器推送过来的消息就响应表明是 0xc也就是一个回调通过线程池来进行处理
如果只是一个普通的响应就把这个结果放到 channel 的 哈希表中随后 channel 会唤醒所有阻塞等待响应的线程去 map 中拿数据. public void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if(response.getType() 0xc) {//服务器推送过来的消息数据SubScribeReturns subScribeReturns (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());//根据 channelId 找到对应的 channel 对象Channel channel channelMap.get(subScribeReturns.getChannelId());if(channel null) {throw new MqException([Connection] 该消息对应的 channel 再客户端中不存在channelId channel.getChannelId());}//执行该 channel 对象内部的回调(这里的开销未知有可能很大同时不希望把这里阻塞住所以使用线程池来执行)callbackPool.submit(() - {try {channel.getConsumer().handlerDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch(MqException | IOException e) {e.printStackTrace();}});} else {//当前响应是针对刚才的控制请求的响应BasicReturns basicReturns (BasicReturns) BinaryTool.fromBytes(response.getPayload());//把这个结果放到 channel 的 哈希表中Channel channel channelMap.get(basicReturns.getChannelId());if(channel null) {throw new MqException([Connection] 该消息对应的 channel 在客户端中不存在channelId channel.getChannelId());}channel.putReturns(basicReturns);}}发送请求和读取响应 /*** 发送请求* param request* throws IOException*/public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println([Connection] 发送请求type request.getType() , length request.getLength());}/*** 读取响应*/public Response readResponse() throws IOException {Response response new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload new byte[response.getLength()];int n dataInputStream.read(payload);if(n ! response.getLength()) {throw new IOException(读取的响应格式不完整! n n , responseLen response.getLength());}response.setPayload(payload);System.out.println([Connection] 收到响应type response.getType() , length response.getLength());return response;}在 Connection 中提供创建 Channel 的方法 public Channel createChannel() throws IOException {String channelId C- UUID.randomUUID().toString();Channel channel new Channel(channelId, this);//放到 Connection 管理的 channel 的 Map 集合中channelMap.put(channelId, channel);//同时也需要把 “创建channel” 这个消息告诉服务器boolean ok channel.createChannel();if(!ok) {//如果创建失败就说明这次创建 channel 操作不顺利//把刚才加入 hash 表的键值对再删了channelMap.remove(channelId);return null;}return channel;}Ps代码中使用了很多次 UUID 这里我们和之前一样使用加前缀的方式来进行区分. 3实现 Channel
属性和构造如下 private String channelId;// 当前这个 channel 是属于哪一个连接private Connection connection;//用来存储后续客户端收到的服务器响应已经辨别是哪个响应(要对的上号) key 是 ridprivate ConcurrentHashMapString, BasicReturns basicReturnsMap new ConcurrentHashMap();//如果当前 Channel 订阅了某个队列就需要记录对应的回调是什么当该队列消息返回回来的时候调用回调//此处约定一个 Channel 只能有一个回调private Consumer consumer;public Channel(String channelId, Connection connection) {this.channelId channelId;this.connection connection;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId channelId;}public Connection getConnection() {return connection;}public void setConnection(Connection connection) {this.connection connection;}public ConcurrentHashMapString, BasicReturns getBasicReturnsMap() {return basicReturnsMap;}public void setBasicReturnsMap(ConcurrentHashMapString, BasicReturns basicReturnsMap) {this.basicReturnsMap basicReturnsMap;}public Consumer getConsumer() {return consumer;}public void setConsumer(Consumer consumer) {this.consumer consumer;实现 0x1 创建 channel
主要就是构造构造出 request然后发送请求到 BrokerServer 服务器阻塞等待服务器响应. /*** 0x1* 和服务器进行交互告诉服务器此处客户端已经创建了新的 channel 了* return*/public boolean createChannel() throws IOException {//构造 payloadBasicArguments arguments new BasicArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());byte[] payload BinaryTool.toBytes(arguments);//发送请求Request request new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);//等待服务器响应BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();}/*** 生成 rid* return*/public String generateRid() {return R- UUID.randomUUID().toString();}/*** 阻塞等待服务器响应* param rid* return*/private BasicReturns waitResult(String rid) {BasicReturns basicReturns null;while((basicReturns basicReturnsMap.get(rid)) null) {//查询结果为空就说明咱们去菜鸟驿站要取的包裹还没到//此时就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}}basicReturnsMap.remove(rid);return basicReturns;}/*** 由 Connection 中的方法调用区分为普通响应之后触发* 将响应放回到 channel 管理的 map 中并唤醒所有线程* param basicReturns*/public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {//当前也不知道有多少线程再等待上述的这个响应//因此就把所有等待的线程唤醒notifyAll();}}Ps其他的 请求操作也和 0x1 的方式几乎一样这里不一一展示了主要说一下 0xa 0xa 消费者订阅队列消息这里要先设置好回调到属性中方便 Connection 通过这个属性来 处理回调
值得注意的一点 我们约定 channelId 就是 consumerTag public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws IOException, MqException {//先设置回调if(this.consumer ! null) {throw new MqException(该 channel 已经设置过消费消息回调了不能重复);}this.consumer consumer;BasicConsumeArguments basicConsumeArguments new BasicConsumeArguments();basicConsumeArguments.setRid(generateRid());basicConsumeArguments.setChannelId(channelId);basicConsumeArguments.setConsumerTag(channelId); // 注意此处的 consumerTag 使用 channelId 来表示basicConsumeArguments.setQueueName(queueName);basicConsumeArguments.setAutoAck(autoAck);byte[] payload BinaryTool.toBytes(basicConsumeArguments);Request request new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(basicConsumeArguments.getRid());return basicReturns.isOk();}二、编写 Demo 2.1、实例
到了这里基本就实现完成了一个 跨主机/服务器 之间的生产者消费者模型了功能上可以满足日常开发对消息队列的使用但是还具有很强的扩展性可以继续参考 RabbitMQ如果有想法的或者是遇到不会的问题可以私信我~
以下我来我来编写一个 demo模拟 跨主机/服务器 之间的生产者消费者模型这里为了方便就在本机演示.
首先再 spring boot 项目的启动类中 创建 BrokerServer 绑定端口号然后启动
SpringBootApplication
public class RabbitmqProjectApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context SpringApplication.run(RabbitmqProjectApplication.class, args);BrokerServer brokerServer new BrokerServer(9090);brokerServer.start();}}编写消费者
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(9090);Connection connection factory.newConnection();Channel channel connection.createChannel();//创建交换机和队列(这里和生产者创建交换机和队列不冲突谁先启动就按照谁的创建即使已经存在交换机和队列再创建也不会有什么副作用)channel.exchangeDeclare(demoExchange, ExchangeType.DIRECT, true, false, null);channel.queueDeclare(demoQueue, true, false, false, null);//消费者消费消息channel.basicConsume(demoQueue, true, new Consumer() {Overridepublic void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println(开销消费);System.out.println(consumerTag consumerTag);System.out.println(body new String(body));System.out.println(消费完毕);}});//由于消费者不知道生产者要生产多少就在这里通过循环模拟一直等待while(true) {Thread.sleep(500);}}}编写生产者
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(9090);Connection connection factory.newConnection();Channel channel connection.createChannel();//创建交换机和队列(这里和消费者创建交换机和队列不冲突谁先启动就按照谁的创建即使已经存在交换机和队列再创建也不会有什么副作用)channel.exchangeDeclare(demoExchange, ExchangeType.DIRECT, true, false, null);channel.queueDeclare(demoQueue, true, false, false, null);//生产消息byte[] body1 Im cyk1 !.getBytes();channel.basicPublish(demoExchange, demoQueue, null, body1);Thread.sleep(500);//关闭连接channel.close();connection.close();}}2.1、实例演示
启动 spring boot 项目启动 BrokerServer 运行消费者消费者和生产者谁先后运行都可以 运行生产者