表白网页在线生成网站源码,网站建设群号,平面设计与网页设计培训,手机网站 生成app#x1f52d; 嗨#xff0c;您好 #x1f44b; 我是 vnjohn#xff0c;在互联网企业担任 Java 开发#xff0c;CSDN 优质创作者 #x1f4d6; 推荐专栏#xff1a;Spring、MySQL、Nacos、Java#xff0c;后续其他专栏会持续优化更新迭代 #x1f332;文章所在专栏 嗨您好 我是 vnjohn在互联网企业担任 Java 开发CSDN 优质创作者 推荐专栏Spring、MySQL、Nacos、Java后续其他专栏会持续优化更新迭代 文章所在专栏网络 I/O 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识 向我询问任何您想要的东西IDvnjohn 觉得博主文章写的还 OK能够帮助到您的感谢三连支持博客 代词: vnjohn ⚡ 有趣的事实音乐、跑步、电影、游戏 目录 前言非线性 VS 线性单 Selector 非线性模型图解分析源码strace 追踪小结 单 Selector 线性模型SelectorThreadSelectorGroupMainThread测试单 selector 模型小结 总结 前言
在之前的文章中从阻塞 I/OBIO、非阻塞 I/ONIO、多路复用 select/poll、多路复用 epoll
重要的 I/O 模型也是现在市场上大部分中间件运用的模型也就是基于 I/O 多路复用epoll比如Redis、RocketMQ、Nginx 等这些地方都运用了 epoll只不过在 RocketMQ 的实现采用了 Netty而 Netty 也基于 epoll 这套多路复用模型进行实现的所以在后续的这些文章会围绕 Netty 的变种看它是如何一步步从单 Selector 非线性模型 — 单 Selector 线性模型 — 单 Selector Group 混杂模式 — 多 Selector Group 主从模式一步步演练过来的本篇博文主要围绕单 Selector 非线性模型 — 单 Selector 线性模型进行具体的展开.
非线性 VS 线性
非线性指的就是多个线程并行执行完这一段业务结果并不是按顺序执行的你以为的执行结果
线性指的就是由一个线程执行完这一段业务结果是按顺序执行完毕的
单 Selector 非线性模型
图解分析
假设说现在给 客户端1 分配到的是 socket fd 2在客户端读数据时为它分配一个读事件当它到达读的逻辑时再给它分配一个对应的写事件那么如果不对读事件或写事件做 cancel 的话那么读、写事件会一直存在也就是它会在被 epfd 所分配的链表结构中一直存放着其实这些读写事件走完它的流程时它相当于已完成本次的读写任务了它没有本质上存在的意义了如果一直存在它就会一直被调起重复的调用 在之前的 epoll 分析时并没有看到 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, events) 的函数调用说明在这里就是要分析使用它的地方它在 Java 代码中相当于就是 java.nio.channels.SelectionKey#cancel 的实现 SelectionKey#cancel请求取消此事件的通道与 selector 的注册调用该方法返回时该事件将无效并且将添加到 selector 取消事件集合中在下一个选择 select 操作期间该事件将从所有的 selector 事件集合中删除也就是不会再被调用 如上图
主线程 Main Thread 负责接收客户端连接由单个 selector 管理所有客户端 fds 连接并对所连接的客户端 fd 注册读 read 事件 也就是调用 epoll_ctl当 select 方法被调用时会监听到链表中有读状态的 fd 事件然后在 Java 程序中会调用 readHandler 方法去新开辟一个线程资源去处理由于此时新开辟的线程和主线程并不是线程执行的若此时不加 SelectionKey#cancel即使已经抛出了线程在线程执行前后这个时差上该客户端的 fd 读事件会被重复触发.当 readHandler 方法执行完会向 selector 注册一个客户端 fd 写事件也就是调用 epoll_ctl然后下一次循环走到 select 方法被调用时会监听到客户端写状态的 fd然后再调用 writeHandler 方法新开辟一个线程资源去处理由于此时新开辟的写线程也不是和主线程线性执行的若此时不加 SelectionKey#cancelselect 方法再次被调用时就会一直调用 writeHandler 方法去执行也就是会一直开辟新的线程去执行写的操作.造成客户端 fd R/W 事件重复调用的原因在主线程中不是通过线性的方式去执行读、写操作的所以读写事件会被重复调用解决方案调用 SelectionKey#cancel 方法在内核级别相当于 epoll_ctl(epfd, EPOLL_CTL_DEL, socketfd)
源码
以上图解分析的结果会通过以下源码的方式来演练并会去观察 strace 生成的内核源码在多线程非线性模型下加 cancel 与 不加 cancel 方法之间的区别
package org.vnjohn.select;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** author vnjohn* since 2023/12/7*/
public class SelectMultiplexingSocketMultiThread {private Selector selector null;int port 8090;public void initServer() {try {ServerSocketChannel server ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));// select、poll、*epoll 都是使用同样的方式打开selector Selector.open();server.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println(Socket Server start...);try {while (true) {while (selector.select(50) 0) {SetSelectionKey selectionKeys selector.selectedKeys();IteratorSelectionKey iter selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {// 先在多路复用器里把 key-cancel 了System.out.println(in.....);readHandler(key);} else if (key.isWritable()) {// 1、你准备好要写什么了这是第一步// 2、第二步你才关心send-queue是否有空间// so读 read 一开始就要注册但是 write 依赖 1、2 关系什么时候用什么时候注册// 如果一开始就注册了write的事件进入死循环一直调起key.cancel();writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(() - {System.out.println(write handler...);SocketChannel client (SocketChannel) key.channel();ByteBuffer buffer (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {client.write(buffer);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc (ServerSocketChannel) key.channel();SocketChannel client ssc.accept();client.configureBlocking(false);ByteBuffer buffer ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println(-------------------------------------------);System.out.println(new SocketClient client.getRemoteAddress());System.out.println(-------------------------------------------);} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {// 即便已经抛出了线程去读取但是在时差里这个 key-read 事件会被重复触发new Thread(() - {System.out.println(read handler.....);SocketChannel client (SocketChannel) key.channel();ByteBuffer buffer (ByteBuffer) key.attachment();buffer.clear();int read;try {while (true) {read client.read(buffer);System.out.println(Thread.currentThread().getName() read);if (read 0) {key.interestOps(SelectionKey.OP_READ);client.register(key.selector(), SelectionKey.OP_WRITE, buffer);} else if (read 0) {break;} else {client.close();break;}}} catch (IOException e) {e.printStackTrace();}}).start();}public static void main(String[] args) {SelectMultiplexingSocketMultiThread service new SelectMultiplexingSocketMultiThread();service.start();}
}strace 追踪
先将源码中 key.cannel 代码进行注释再观察命令窗口是否会重复调用 R/W 事件操作. 1、先将代码首行 package 移除 2、通过 javac 将源文件 .java 生成 .class 3、通过命令启动服务端strace -ff -o epoll java -Djava.nio.channels.spi.SelectorProvidersun.nio.ch.EPollSelectorProvider SelectMultiplexingSocketMultiThread 4、通过 nc localhost 8090 模拟客户端连接 若将代码中 key.cannel 移除在客户端命令窗口输入内容以后服务端会读取这份内容并会注册一个写事件EPOLL_OUT此时的效果就是在后台会一直触发 writeHandler 方法的调用 若将代码中 key.cannel 恢复在程序中每执行完一次读、写事件以后就会将事件注销掉也就是它会从链表中移除这两个对应的事件确保下一次 select 不会被再次触发调用. 在一定的时间差内read 事件会被重复触发当执行到了 writeHandler 以后该事件已经被 cannel 掉了此时已经不会再重复被调起了.
小结
非线性模型由单个线程负责 accept 接收客户端连接然后抛出不同的线程分别去处理读、写 考虑资源利用为了充分利用好 CPU 核数 若有一个 socket fd 执行特别耗时在一个单线性流程里会阻塞其他的 socket fd 处理 考虑如何处理当有 N 个 fd 同时有 R/W 处理的时候可以分为以下几步处理
将 N 个 FD 分组每一组对应一个 selector将每一个 selector 分别放到不同的线程上selector 与线程的关系是 11若是多个线程它们分别在不同的 CPU 上执行此时会存在多个 selector 并行此时线程内部是线性执行的方式最终是多个 FD 在并行的处理 accept、R/W 事件 不是说一个 selector 中的 FD 并行在多个线程里面处理而是每一个 selector 都会保证一个 FD 在执行且是线性处理的 以上的考虑都是基于分而治之思想假设程序里有 100W 个连接有四个线程selector此时可以拿出其中一个 selector 就单单关注 accept 事件然后把 accept 接收过后的客户端 FD R/W 事件分配给其他 selector 去进行处理.
单 Selector 线性模型
单个 selector 充当为一个线程 thread来接收处理客户端的 accept 以及接收客户端读写 R/W 事件
SelectorThread
package org.vnjohn.selector.singleton;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;/*** author vnjohn* since 2023/12/15*/
public class SelectorThread implements Runnable {Selector selector null;LinkedBlockingQueueChannel lbq new LinkedBlockingQueue();SelectorThreadGroup selectorThreadGroup null;public SelectorThread(SelectorThreadGroup selectorThreadGroup) {try {this.selectorThreadGroup selectorThreadGroup;selector Selector.open();} catch (IOException e) {e.printStackTrace();}}Overridepublic void run() {// loopwhile (true) {try {// 1.select:如果一直没有 fd该方法会阻塞一直没有返回通过调用 wakeup() 唤醒System.out.println(Thread.currentThread().getName() before select ...... selector.keys().size());int num selector.select();System.out.println(Thread.currentThread().getName() after select ...... selector.keys().size());// 2.处理 selectKeysif (num 0) {SetSelectionKey selectionKeys selector.selectedKeys();IteratorSelectionKey iterator selectionKeys.iterator();while (iterator.hasNext()) {// 每一个 fd 是线性处理的过程SelectionKey key iterator.next();iterator.remove();if (key.isAcceptable()) {// 接受客户端的过程acceptHandler(key);} else if (key.isReadable()) {readHandler(key);} else if (key.isWritable()) {}}}// 3.处理 queue runTask,队列是堆里的对象线程的栈是独立的堆是共享的只有方法的逻辑本地变量是线程隔离的if (!lbq.isEmpty()) {Channel channel lbq.take();// accept 使用的是 ServerSocketChannelif (channel instanceof ServerSocketChannel) {ServerSocketChannel server (ServerSocketChannel) channel;server.register(selector, SelectionKey.OP_ACCEPT);System.out.println(Thread.currentThread().getName() register server);// read / write 使用的是 SocketChannel} else if (channel instanceof SocketChannel) {SocketChannel client (SocketChannel) channel;ByteBuffer buffer ByteBuffer.allocateDirect(4096);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println(Thread.currentThread().getName() register client: client.getRemoteAddress());}}} catch (IOException | InterruptedException e) {e.printStackTrace();}}}private void readHandler(SelectionKey key) {System.out.println(Thread.currentThread().getName() readHandler.......);ByteBuffer buffer (ByteBuffer) key.attachment();SocketChannel client (SocketChannel) key.channel();buffer.clear();while (true) {try {int num client.read(buffer);if (num 0) {// 将读到的内容翻转,然后直接写出buffer.flip();while (buffer.hasRemaining()) {client.write(buffer);}buffer.clear();} else if (num 0) {break;} else {// 有可能客户端断开了-异常情况System.out.println(client: client.getRemoteAddress() closed....);key.cancel();client.close();break;}} catch (IOException e) {e.printStackTrace();}}}private void acceptHandler(SelectionKey key) {System.out.println(Thread.currentThread().getName() acceptHandler.......);ServerSocketChannel server (ServerSocketChannel) key.channel();try {SocketChannel client server.accept();client.configureBlocking(false);// choose a selector and register !!selectorThreadGroup.nextSelector(client);} catch (IOException e) {e.printStackTrace();}}
}第一个循环是死循环让当前的线程一直阻塞运行处理事件 第二个循环是调用 Selector#select 方法一直等待拿到事件在这个里面会判断到来的事件是属于 accept read write再执行对应的操作在这里会做一个事情当拿到 accept 新连接的客户端再将它的连接信息绑定到对应的 selector也就是将它添加到链表队列中 第三个非循环只是从队列中取出元素无元素的情况进行下一次的 select存在元素则判断这个元素是属于服务端的 channel 还是客户端的 channel若是服务端的 channel 则将它往 epfd 注册一个 accept 事件若是客户端的 channel 则将它往 epfd 注册一个 read 事件read 事件是一直存在的 而写事件是由服务端主动发起的在这里就是模拟业务的过程在 readHandler 处理过程中直接将源数据写回到客户端 SelectorGroup
使用 Selector Group 来用于分配线程执行以及 selector 调度执行目前在此都是采用的单线程 该 Group 用于承担以后 Boss、Worker 角色的核心分配类 package org.vnjohn.selector.singleton;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;/*** author vnjohn* since 2023/12/15*/
public class SelectorThreadGroup {SelectorThread[] selectorThreads;ServerSocketChannel server null;AtomicInteger xid new AtomicInteger(0);public SelectorThreadGroup(int num) {// num 是线程数selectorThreads new SelectorThread[num];// 启动多个线程一个线程对应一个 selectorfor (int i 0; i selectorThreads.length; i) {selectorThreads[i] new SelectorThread(this);new Thread(selectorThreads[i], SelectorThread- i).start();}}public void bind(int port) {try {server ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));// 选择一个 selector 来充当服务端的 accept 连接nextSelector(server);} catch (IOException e) {e.printStackTrace();}}/*** 无论是 ServerSocketChannel 还是 SocketChannel 都复用这个方法*/public void nextSelector(Channel channel) {// 在主线程中取到堆里的 selectorThread 对象SelectorThread selectorThread next();// 1.通过队列传递数据、消息selectorThread.lbq.add(channel);// 2.通过打断阻塞让对应的线程在打断后去自己完成注册 selector唤醒 select 阻塞的操作selectorThread.selector.wakeup();// 这个时候才有了队列多线程模型下才能进行相互之间的通信}private SelectorThread next() {// 单个 group 多线程时会进行轮询处理有可能也会导致资源倾斜int index xid.incrementAndGet() % selectorThreads.length;return selectorThreads[index];}
}重点channel 有可能是 server 的 ServerSocketChannel 也有可能是 client 的 SocketChannel在这里做强制转换会出现错误将 channel 分配的工作延迟给到队列进行 take由阻塞链表队列来进行区分是属于服务端的 channel 还是客户端的 channel再去执行对应的操作accept、read、write MainThread
以下主线程类不做任何业务 I/O 相关的工作只是为了创建一个带有指定数量的 SelectorGroup
package org.vnjohn.selector.singleton;/*** author vnjohn* since 2023/12/15*/
public class MainThread {public static void main(String[] args) {// 1、创建 IO Thread一个或多个SelectorThreadGroup selectorThreadGroup new SelectorThreadGroup(1);// 混杂模式:只有一个线程负责 accept,每个线程都会被分配 client,进行 R/W// SelectorThreadGroup selectorThreadGroup new SelectorThreadGroup(3);// 2、应该把监听8090的 server 注册到某一个 selector 上selectorThreadGroup.bind(8090);}
}测试单 selector 模型
1、启动主线程 main 方法控制台输出内容如下
SelectorThread-0 before select ......0
SelectorThread-0 after select ......0
SelectorThread-0 register server
SelectorThread-0 before select ......12、nc localhost 8090 模拟客户端来连接服务端进行读、写操作首先看到的是由当前 SelectorThread 进行 accept每次都是从队列中取出元素根据当前元素是属于服务端 channel 还是客户端 channel 进行区分服务端 channel 则 accept客户端 channel 则注册 read以便于客户端从网卡到来的数据在服务端能够进行响应
新客户端到来并且写入数据123控制台输出内容如下
SelectorThread-0 after select ......1
SelectorThread-0 acceptHandler.......
SelectorThread-0 register client:/0:0:0:0:0:0:0:1:60036
SelectorThread-0 before select ......2
SelectorThread-0 after select ......2
SelectorThread-0 before select ......2
SelectorThread-0 after select ......2
SelectorThread-0 readHandler.......
SelectorThread-0 before select ......2从返回的内容来看当前的 SelectorThread 先是进行 accept 然后执行唤醒 Selector 的操作此时 select 马上不会进行阻塞直接返回打印的日志内容before select、after select不管是不是有 accept、read、write 事件它都会先遍历一次进行处理在一定时间差内你可以看到它打印了两次 若不让它打印两次可以在 before select 打印以后进行 Thread.sleep(50);但是这种方式是不可取的它无法应用到高并发的场景下 在正常情况下客户端只是先进行连接而不做 R/W 操作它会一直阻塞在 Selector#select 这个操作下的只有当客户端从网卡发送了数据此时 Selector 马上就会通过中断的方式将有状态的事件存在到内核链表中此时就能获取到 selectKey而这个 selectKey 是作为读操作存在的所以会调用 readHandler 进行读和写的操作
小结
小结一下以上单 Selector 线性模型执行的过程
Selector#select若一直无 FD 事件存在该方法会一直阻塞一直不会返回结果只能通过 Selector#wakeup 方法将 Selector 唤醒accept通过当前线程所在的 SelectorGroup 将其分配到某个线程的 SelectorThread 下read、write在 readHandler 方法执行完读操作以后模拟业务代码将客户端写入的数据再由服务端写回到客户端
每一个线程都是一个单独的 Selector若在多线程的情况下以上的程序可能在并发场景下会被分配到多个 Selector 上 注意的要求是每个客户端只能绑定到一个 Selector 上不存在多线程顺序交互的问题简而言之就是说每个客户端连接进来它都要以线性的方式进行执行 再言之单线程模型不能充分的利用到多核多 CPU 资源同时在 100W 客户端进来时这种模型跑起来会非常的慢对于一个高并发系统设计而言是一定不能够被接受的 所以这种模型仍然会存在问题在下篇会介绍如何去解决这种应用在多线程场景下客户端不进行乱串执行以及资源有效利用的问题
总结
该篇博文主要介绍多路复用模型 Epoll 下单 Selector 多线程与单线程之间的区别先是说明了在单 Selector 非线性模型下-多线程会造成读、写事件重复触发的问题 通过图解和 strace 追踪日志的方式说明了它的缺点解决事件重复触发问题通过 SelectionKey#cannel 来进行解决莫须有这种方式不可取会造成假死线程资源停滞不释放问题后者介绍了单个 Selector 单 Group 解决这种假死资源的存在问题结合 Selector#wakeup 链接阻塞队列的方式来完成在单 Selector 线性模型下是可取的但是为了应用多核多 CPU 资源在多线程场景下这种模型会造成一个客户端在多个 Selector 中乱串执行的问题希望您能够喜欢感谢三连支持
参考文献
《UNIX网络编程 卷1套接字联网API第3版》— [美] W. Richard Stevens Bill Fenner Andrew M. Rudoff
学习帮助文档
man pagesyum install manpthread man pagesyum -y install man-pages 愿你我都能够在寒冬中相互取暖互相成长只有不断积累、沉淀自己后面有机会自然能破冰而行 博文放在 网络 I/O 专栏里欢迎订阅会持续更新
如果觉得博文不错关注我 vnjohn后续会有更多实战、源码、架构干货分享
推荐专栏Spring、MySQL订阅一波不再迷路
大家的「关注❤️ 点赞 收藏⭐」就是我创作的最大动力谢谢大家的支持我们下文见