网站建设手机版,中美最新军事新闻最新消息,提供广东中山网站建设,项目策划书八篇案例目录 1. 高级IO1.1 IO的基本概念1.2 OS如何得知外设当中有数据可读取1.3 OS如何处理从网卡中读取到的数据包1.4 IO的步骤 2. 五种IO模型2.1 利用钓鱼来理解2.2 阻塞IO2.3 非阻塞IO2.4 信号驱动IO2.5 IO多路转接2.6 异步IO 3. 高级IO的概念3.1 同步通信 VS 异步通信3.2 阻塞 VS … 目录 1. 高级IO1.1 IO的基本概念1.2 OS如何得知外设当中有数据可读取1.3 OS如何处理从网卡中读取到的数据包1.4 IO的步骤 2. 五种IO模型2.1 利用钓鱼来理解2.2 阻塞IO2.3 非阻塞IO2.4 信号驱动IO2.5 IO多路转接2.6 异步IO 3. 高级IO的概念3.1 同步通信 VS 异步通信3.2 阻塞 VS 非阻塞 4. 阻塞IO5. 非阻塞IO5.1 fcntl函数 6. IO事件就绪7. I/O多路转接之Select7.1 初识select7.2 Select函数原型7.3 理解Select的执行过程7.4 socket就绪条件7.4.1 读就绪7.4.2 写就绪7.4.3 异常就绪 7.5 Select的特点7.6 Select缺点7.7 Select使用示例 8. I/O多路转接之Poll8.1 Poll函数接口8.2 Poll的优点8.3 Poll的缺点8.4 Poll使用示例 9. I/O多路转接之epoll9.1 epoll初识9.2 epoll的相关系统调用9.2.1 epoll_create9.2.2 epoll_ctl9.2.3 epoll_wait 9.3 epoll底层原理9.4 epoll的优点9.5 epoll工作方式9.5.1 水平触发Level Triggered 工作模式9.5.2 边缘触发Edge Triggered工作模式9.5.3 epoll的使用场景 9.6 对比LT和ET9.7 理解ET模式和非阻塞文件描述符9.8 epoll使用示例LT模式9.9 epoll使用示例ET模式 1. 高级IO
1.1 IO的基本概念
概念内存和外设进行沟通的动作叫做IOIO也就是输入和输出.在冯诺依曼体系当中将数据从输入设备拷贝到内存就叫做输入将数据从内存拷贝到输出设备就叫做输出。 对文件进行的读写操作本质就是一种IO文件IO对应的外设就是磁盘。对网络进行的读写操作本质也是一种IO网络IO对应的外设就是网卡。在网络层面数据往网络里写的本质是将数据从内存拷贝到网卡上从网络里读的本质是将数据从网卡拷贝到内存。
我们之前学到的所有关于IO的接口都可以称作拷贝接口以send为例子
send把数据拷贝到TCP的发送缓冲区里读数据的时候是从缓冲区里面去拿有数据才能拿没数据只能等。
以接收为例子如果缓冲区满了就通告对方然后对方就不发数据了我的缓冲区为什么会满呢无非就是上层来不及取或者取的太慢了数据来的太快导致缓冲区被打满了。
当缓冲区满了如何让发送方尽快发呢肯定就是通知上层尽快把数据取走。我们可以认为是一种生产消费者模型其中缓冲区就是交易场所
1.2 OS如何得知外设当中有数据可读取
输入就是操作系统将数据从外设拷贝到内存的过程操作系统一定要通过某种方法得知特定外设上是否有数据就绪 首先需要明确的是并不是操作系统想要从外设读取数据时外设上就一定有数据。 比如用户正在访问某台服务器当用户的请求报文发出后就需要等待从网卡当中读取服务器发来的响应数据。但此时对方服务器可能还没有收到我们发出的请求报文或是正在对我们的请求报文进行数据分析也有可能服务器发来的响应数据还在网络中路由。 但操作系统不会主动去检测外设上是否有数据就绪这种做法一定会降低操作系统的工作效率。 因为大部分情况下外设当中都是没有数据的因此操作系统所做的大部分检测工作其实都是徒劳的。 操作系统实际采用的是中断的方式来得知外设上是否有数据就绪的当某个外设上面有数据就绪时该外设就会向CPU当中的中断控制器发送中断信号中断控制器再根据产生的中断信号的优先级按顺序发送给CPU。 每一个中断信号都有一个对应的中断处理程序存储中断信号和中断处理程序映射关系的表叫做中断向量表当CPU收到某个中断信号时就会自动停止正在运行的程序然后根据该中断向量表执行该中断信号对应的中断处理程序处理完毕后再返回原被暂停的程序继续运行。
需要注意的是CPU不直接和外设打交道指的是在数据层面上而外设其实是可以直接将某些控制信号发送给CPU当中的某些控制器的。
1.3 OS如何处理从网卡中读取到的数据包
操作系统任何时刻都可能会收到大量的数据包因此操作系统必须将这些数据包管理起来所谓的管理就是“先描述再组织”。在内核当中有一个结构叫做sk_buff该结构就是用来管理和控制接收或发送数据包的信息的。
1如下是一个简化版的sk_buff结构
struct sk_buff
{char* transport_header;char* network_header;char* mac_header;char* data;struct sk_buff* next;struct sk_buff* prev;
};2当操作系统从网卡当中读取到一个数据包后会将该数据依次交给链路层、网络层、传输层、应用层进行解包和分用最终将数据包中的数据交给了上层用户如下是sk_buff结构是如何进行数据包的解包和分用的过程
当操作系统从网卡中读取到一个数据包后就会定义出一个sk_buff结构然后用sk_buff结构当中的data指针指向这个读取到的数据包并将定义出来的这个sk_buff结构与其他sk_buff结构以双链表的形式组织起来此时操作系统对各个数据包的管理就变成了对双链表的增删查改等操作。接下来我们需要将读取上来的数据包交给最底层的链路层处理进行链路层的解包和分用此时就是让sk_buff结构当中的mac_header指针指向最初的数据包然后向后读取链路层的报头剩下的就是需要交给网络层处理的有效载荷了此时便完成了链路层的解包。这时链路层就需要将有效载荷向上交付给网络层进行解包和分用了这里所说的向上交付只是形象的说法实际向上交付并不是要将数据从链路层的缓冲区拷贝到网络层的缓冲区我们只需要让sk_buff结构当中的network_header指针指向数据包中链路层报头之后的数据即可然后继续向后读取网络层的报头便完成了网络层的解包。紧接着就是传输层对数据进行处理了同样的道理让sk_buff结构当中的transport_header指针指向数据包中网络层报头之后的数据然后继续向后读取传输层的报头便完成了传输层的解包。传输层解包后就可以根据具体使用的传输层协议对应将剩下的数据拷贝到TCP或UDP的接收缓冲区供用户读取即可。 3发送数据时对数据进行封装也是同样的道理就是依次在数据前面拷贝上对应的报头最后再将数据发送出去UDP或拷贝到发送缓冲区TCP即可。也就是说数据包在进行封装和解包的过程中本质数据的存储位置是没有发生变化的我们实际只是在用不同的指针对数据进行操作而已。
但内核中的sk_buff并不像不是那样简单
一方面为了保证高效的网络报文处理效率这就要求sk_buff的结构也必须是高效的。另一方面sk_buff结构需要被内核协议中的各个协议共同使用因此sk_buff必须能够兼容所有的网络协议。
1.4 IO的步骤
当程序运行到IO函数时一般都在阻塞式等待这也算作IO过程中的一个环节但真正意义上的IO就是讲数据拷贝到缓冲区中任何IO过程都要包含两个步骤等待和拷贝 即: IO 等待数据拷贝。
要提高IO的效率主要分为两步
第一步是等即等待IO条件就绪。第二步是拷贝也就是当IO条件就绪后将数据拷贝到内存或外设。
任何IO的过程都包含“等”和“拷贝”这两个步骤在实际的应用场景中等待消耗的时间往往都远远高于拷贝的时间所以提高IO效率的本质是尽可能地减少单位时间内“等待”的比重。
提高IO效率的方式有两种改变等待的方式和减少等待的比重高效的IO本质就是减少单位时间内“等待”的比重
2. 五种IO模型
2.1 利用钓鱼来理解
1我们以钓鱼为例子IO的过程其实和钓鱼是非常类似的。
钓鱼的过程同样分为“等”和“拷贝”两个步骤只不过这里的“等”指的是等鱼上钩“拷贝”指的是当鱼上钩后将鱼从河里“拷贝”到我们的鱼桶当中。IO时“等”消耗的时间往往比“拷贝”消耗的时间多钓鱼也恰好符合这个特点。钓鱼时我们大部分时间都在等鱼上钩而当鱼上钩后只需要一瞬间就能将鱼“拷贝”上来。
2在谈论高效的IO之前我们先来看看什么样的钓鱼方式才是高效的下面给出五个人的钓鱼方式
张三拿了1个鱼竿将鱼钩抛入水中后就死死的盯着浮漂什么也不做当有鱼上钩后就挥动鱼竿将鱼钓上来。李四拿了1个鱼竿将鱼钩抛入水中后就去做其他事情然后定期观察浮漂如果有鱼上钩则挥动鱼竿将鱼钓上来否则继续去做其他事情。王五拿了1个鱼竿将鱼钩抛入水中后在鱼竿顶部绑一个铃铛然后就去做其他事情如果铃铛响了就挥动鱼竿将鱼钓上来否则就根本不管鱼竿。赵六拿了100个鱼竿将100个鱼竿抛入水中后就定期观察这100个鱼竿的浮漂如果某个鱼竿有鱼上钩则挥动对应的鱼竿将鱼钓上来。田七田七是一个有钱的老板他给了自己的司机一个桶、一个电话、一个鱼竿让司机去钓鱼当鱼桶装满的时候再打电话告诉田七来拿鱼而田七自己则开车去做其他事情去了。
3那么张三、李四、王五的钓鱼效率是否一样呢为什么
其实张三、李四、王五的钓鱼效率本质上是一样的。
首先他们的钓鱼方式都是一样的都是先等鱼上钩,然后再将鱼钓上来。其次,因为他们每个人都是拿的一根鱼竿当河里有鱼来咬鱼钩时这条鱼咬哪一个鱼钩的概率都是相等的。
因此张三、李四、王五他们三个人的钓鱼的效率是一样的他们只是等鱼上钩的方式不同而已张三是死等李四是定期检测浮漂而王五是通过铃铛来判断是否有鱼上钩。
注意这里问的是他们的钓鱼效率是否是一样的而不是问他们整体谁做的事最多如果说整体做事情的量的话那一定是王五做得最多李四次之张三最少。
4张三、李四、王五它们三个人分别和赵六比较谁的钓鱼效率更高
赵六毫无疑问是这四个人当中钓鱼效率最高的因为赵六同时在等多个鱼竿上有鱼上钩因此在单位时间内赵六的鱼竿有鱼上钩的概率是最大的。
为了方便计算我们假设赵六拿了97个鱼竿加上张三、李四、王五的鱼竿一共就有100个鱼竿。当河里有鱼来咬鱼钩时这条鱼咬张三、李四、王五的鱼钩的概率都是百分之一而咬赵六的鱼钩的概率就是百分之九十七。因此在单位时间内赵六的鱼竿上有鱼的概率是张三、李四、王五的97倍
而高效的钓鱼就是要减少单位时间内“等”的时间增加“拷贝”的时间所以说赵六的钓鱼效率是这四个人当中最高的。赵六的钓鱼效率之所以高是因为赵六一次等待多个鱼竿上的鱼上钩此时就可以将“等”的时间进行重叠。
5如何看待田七的这种钓鱼方式
田七让自己的司机帮自己钓鱼自己开车去做其他事情去了此时这个司机具体怎么钓鱼已经不重要了他可以模仿张三、李四、王五、赵六任何一个人的钓鱼方式进行钓鱼最重要的是田七本人并没有参与整个钓鱼的过程他只是发起了钓鱼的任务而真正钓鱼的是司机田七在司机钓鱼期间可能在做任何其他事情如果将钓鱼看作是一种IO的话那田七的这种钓鱼方式就叫做异步IO它本身是没有参与IO的等待和拷贝的过程。而对于张三、李四、王五、赵六来说他们都需要自己等鱼上钩当鱼上钩后又需要自己把鱼从河里钓上来对应到IO当中就是需要自己进行数据的拷贝也需要自己进行等待因此他们四个人的钓鱼方式都叫做同步IO。
6这五个人的钓鱼方式分别对应的就是五种IO模型
张三这种死等的钓鱼方式对应就是阻塞IO。李四这种定时检测是否有鱼上钩的方式就是非阻塞IO。王五这种通过设置铃铛得知事件是否就绪的方式就是信号驱动IO。王五这种一次等待多个鱼竿上有鱼的钓鱼方式就是IO多路转接。田七这种让别人帮自己钓鱼的钓鱼方式就是异步IO。
IO模型简单对比解释阻塞IO一心一意地等待数据到来非阻塞IO三心二意地轮询式等待信号驱动信号递达时再来读取或写入数据多路转接让大批线程等待,自身读取数据异步通信IO让其他进程或线程进行等待和读取,自身获取结果
阻塞IO非阻塞IO和信号驱动IO本质上是不能提高IO的效率的但非阻塞IO和信号驱动IO能提高整体做事的效率。
上述钓鱼的例子中钓鱼的河对应就是内核这里的每一个人都是进程或线程鱼竿对应的就是文件描述符fd或套接字装鱼的桶对应的就是用户缓冲区。
7如何区分同步IO和异步IO
判断一个IO过程是同步的还是异步的本质就是看当前进程或线程是否需要参与IO过程如果要参与等待或者拷贝那就是同步IO否则就是异步IO。
2.2 阻塞IO
阻塞IO就是在内核还没将数据准备好系统调用会一直等待阻塞IO是最常见的IO模型所有的套接字默认就是阻塞等待 比如当调用recvfrom函数从某个套接字上读取数据时可能底层数据还没有准备好此时就需要等待数据就绪当数据就绪后再将数据从内核拷贝到用户空间最后recvfrom函数才会返回。在recvfrom函数等待数据就绪期间在用户看来该进程或线程就阻塞住了(卡住了)本质就是操作系统将该进程或线程的状态设置为S状态然后将其进程PCB放入等待队列当中当数据就绪后操作系统再将该进程PCB其从等待队列当中唤醒将该进程或线程的状态设置为R状态把进程PCB放到运行队列然后该进程或线程再将数据从内核拷贝到用户空间。
以阻塞方式进行IO操作的进程或线程在“等”和“拷贝”没有结束期间都不会返回在用户看来就像是阻塞住了因此我们称之为阻塞IO。
2.3 非阻塞IO
1非阻塞IO如果内核还未将数据准备好系统调用仍然会直接返回并且返回EWOULDBLOCK错误码。 非阻塞IO往往需要程序员以循环的方式反复尝试读写文件描述符这个过程称为轮询这对CPU来说是较大的浪费一般只有特定场景下才使用
比如当调用recvfrom函数以非阻塞方式从某个套接字上读取数据时如果底层数据还没有准备好那么recvfrom函数会立马返回而不会让该进程或线程进行阻塞等待。因为没有读取的数据因此该进程或线程后续还需要继续调用recvfrom函数检测底层数据是否就绪如果没有就绪则继续返回直到某次检测到底层数据就绪后再将数据从内核拷贝到用户空间然后进行成功返回。每次调用recvfrom函数读取数据时就算底层数据没有就绪recvfrom函数也会立马返回在用户看来该进程或线程就没有被阻塞住因此我们称之为非阻塞IO。
2阻塞IO和非阻塞IO的区别
阻塞和非阻塞的共同点都是同步IO。唯一的区别等待的方式不同 拷贝数据还是得他们去拷一个是阻塞等一个是没有就绪就在应用层立马返回。阻塞IO当数据没有就绪时后续检测数据是否就绪的工作是由操作系统发起的。非阻塞IO当数据没有就绪时后续检测数据是否就绪的工作是由用户发起的。
2.4 信号驱动IO
1信号驱动IO就是当内核将数据准备好的时候使用SIGIO信号通知应用程序进行IO操作 当底层数据就绪的时候会向当前进程或线程递交SIGIO信号(29号信号----可以通告kill -l进行查看所有的信号)因此可以通过signal或sigaction函数进行自定义捕捉将SIGIO的信号处理程序自定义为需要进行的IO操作当底层数据就绪时就会自动执行对应的IO操作。
例如我们需要调用recvfrom函数从某个套接字上读取数据那么就可以将该操作定义为SIGIO的信号处理程序当底层数据就绪时操作系统就会递交SIGIO信号此时就会自动执行我们定义的信号处理程序进程将数据从内核拷贝到用户空间。
2信号的产生是异步的但信号驱动IO是同步IO的一种。
信号的产生异步的是因为信号在任何时刻都可能产生。但信号驱动IO是同步IO的一种因为当底层数据就绪时当前进程或线程需要停下正在做的事情转而进行数据的拷贝操作因此当前进程或线程仍然需要参与IO过程。
信号驱动只使用了recv的拷贝功能等待数据的过程是异步的(信号的产生是异步的)但拷贝数据是同步的。
2.5 IO多路转接
IO多路转接也叫做IO多路复用能够同时等待多个文件描述符的就绪状态 IO多路转接的思想
因为IO过程分为“等”和“拷贝”两个步骤因此我们使用的recvfrom等接口的底层实际上都做了两件事第一件事就是当数据不就绪时需要进行等待第二件事就是当数据就绪后需要进行拷贝。虽然recvfrom等接口也有“等”的能力但这些接口一次只能“等”一个文件描述符上的数据或空间就绪这样IO效率太低了。因此系统为我们提供了三组接口分别叫做select、poll和epoll他们一次可以等待一批文件描述符就绪这些接口的核心工作就是“等”我们可以将所有“等”的工作都交给这些多路转接接口。因为这些多路转接接口是一次“等”多个文件描述符的因此能够将“等”的时间进行重叠当数据就绪后再调用对应的recvfrom等函数进行数据的拷贝此时这些函数就能够直接进行拷贝而不需要进行“等”操作了。
IO多路转接就像现实生活中的黄牛一样只不过IO多路转接更像是帮人排队的黄牛因为多路转接接口实际并没有帮我们进行数据拷贝的操作这些排队黄牛可以一次帮多个人排队此时就将多个人排队的时间进行了重叠。
2.6 异步IO
异步IO就是由内核在数据拷贝完成时通知应用程序 (异步IO不参与等待和拷贝的过程) 进行异步IO需要调用一些异步IO的接口异步IO接口调用后会立马返回因为异步IO不需要你进行“等”和“拷贝”的操作这两个动作都由操作系统来完,你要做的只是发起IO调用。当异步IO完成后操作系统会通知应用程序因此进行异步IO的进程或线程并不参与IO的所有细节。
我们将缓冲区变量提供给异步接口接口会等待并将数据放到缓冲区中并通知进程进程可以直接处理数据并不会参与任何IO过程所以是异步的。
异步IO系统提供有一些对应的系统接口但大多使用复杂也不建议使用异步IO也有更好的替代方案。
3. 高级IO的概念
3.1 同步通信 VS 异步通信
1同步和异步关注的是消息通信机制
所谓同步就是在发出一个调用时,在没有得到结果之前,该调用就不返回但是一旦调用返回就得到返回值了换句话说就是由调用者主动等待这个调用的结果。异步则是相反调用在发出之后这个调用就直接返回了所有没有返回结果换句话说就是当一个异步过程调用发出后调用者不会立刻得到结果而是在调用发出后被调用者通过状态来通知来通知调用或通过回调函数处理这个调用。
2为什么非阻塞IO在没有得到结果之前就返回了
IO是分为“等待”和“拷贝”两步的当调用recvfrom进行非阻塞IO时如果数据没有就绪那么调用会直接返回此时这个调用返回时并没有完成一个完整的IO过程即便调用返回了那也是属于错误的返回。因此该进程或线程后续还需要继续调用recvfrom轮询检测数据是否就绪当数据就绪后最后再把数据从内核拷贝到用户空间这才是一次完整的IO过程。
因此在进行非阻塞IO时在没有得到结果之前虽然这个调用会返回但后续还需要继续进行轮询检测因此可以理解成调用还没有返回而只有当某次轮询检测到数据就绪并且完成数据拷贝后才认为该调用返回了。
3同步IO通信 VS 同步与互斥
在多进程和多线程当中有同步与互斥的概念但是这里的同步通信和进程或线程之间的同步是完全不相干的概念。
进程/线程同步指的是在保证数据安全的前提下让进程/线程能够按照某种特定的顺序访问临界资源从而有效避免饥饿问题谈论的是进程/线程间的一种工作关系。同步IO指的是进程/线程与操作系统之间的关系谈论的是进程/线程是否需要主动参与IO过程。
因此当看到“同步”这个词的时候一定要先明确这个同步是同步通信的同步还是同步与互斥的同步。
3.2 阻塞 VS 非阻塞
1阻塞和非阻塞关注的是程序在等待调用结果消息、返回值时的状态
阻塞调用是指调用结果返回之前当前线程会被挂起调用线程只有在得到结果之后才会返回。非阻塞调用指的是在不能立刻得到结果之前该调用不会阻塞当前线程。
注意如果非阻塞调用没有获取到数据时是以出错的形式返回的但并不算真正的错误通过错误码errno区分出错和条件未就绪。
4. 阻塞IO
1系统中大部分的接口都是阻塞式接口。比如我们可以用read函数从标准输入(键盘)当中读取数据然后使用write函数输出到标准输出(屏幕)当中
#includeiostream
#includeunistd.h
using namespace std;int main()
{while(1){char buffer[1024] {0};size_t s read(0,buffer,sizeof(buffer)-1);if(s 0){buffer[s] 0;write(1,buffer,sizeof(buffer));}else {cout read errnoendl;break;}}return 0;
}2程序运行后如果我们不进行输入操作此时该进程就会阻塞住根本原因就是因为此时底层数据不就绪因此read函数需要进行阻塞等待 3我们也可以发现当前进程的状态是S状态也就是放到了等待队列当中。 4一旦我们进行了输入操作此时read函数就会检测到底层数据就绪然后立马将数据读取到从内核拷贝到我们传入的buffer数组当中并且调用write函数将读取到的数据输出到显示器上面最后我们就看到了我们输入的字符串。 5. 非阻塞IO
打开文件时默认都是以阻塞的方式打开的如果要以非阻塞的方式打开某个文件需要在使用open函数打开文件时携带O_NONBLOCK或O_NDELAY选项此时就能够以非阻塞的方式打开文件。 如果要将已经打开的某个文件或套接字设置为非阻塞此时就需要用到fcntl函数。
5.1 fcntl函数
1函数原型
#include unistd.h
#include fcntl.h
int fcntl(int fildes, int cmd, ...); //可变参数列表2参数说明
fd已经打开的文件描述符。cmd需要进行的操作。…可变参数传入的cmd值不同后面追加的参数也不同。
3fcntl函数常用的5种功能与其对应的cmd取值如下
复制一个现有的描述符cmdF_DUPFD。获得/设置文件描述符标记cmdF_GETFD或F_SETFD。获得/设置文件状态标记cmdF_GETFL或F_SETFL。获得/设置异步I/O所有权cmdF_GETOWN或F_SETOWN。获得/设置记录锁cmdF_GETLK, F_SETLK或F_SETLKW。
4返回值
如果函数调用成功则返回值取决于具体进行的操作。如果函数调用失败则返回-1同时错误码会被设置。
5使用例子我们可以定义一个函数该函数就用于将指定的文件描述符设置为非阻塞状态
void SetNonBlock(int fd)
{int f1 fcntl(fd,F_GETFL);//获取文件描述符对应的文件状态标记if(f1 0){perror(fcntl);return ;}fcntl(fd,F_SETFL,f1 | O_NONBLOCK);//添加非阻塞标记 O_NONBLOCK,对文件状态标记进行设置
}此时就将该文件描述符设置为了非阻塞状态。
6以非阻塞轮询方式读取标准输入err版本
此时在调用read函数读取标准输入之前调用SetNonBlock函数将0号文件描述符设置为非阻塞就行了。
int main()
{SetNonBlock(0);//将0号描述符设置为非阻塞状态while(1){char buffer[1024] {0};size_t s read(0,buffer,sizeof(buffer)-1);//少读取一个,是为了在最后的位置放\0if(s 0){buffer[s] 0;write(1,buffer,sizeof(buffer));printf(read success: s:%d errno:%d \n,s,errno);}else {printf(read failed: s:%d errno:%d \n,s,errno); }sleep(1);}return 0;
}注意上面的是错误的如果非阻塞调用没有获取到数据时是以出错的形式返回的但并不算真正的错误通过错误码errno区分出错和条件未就绪。
我们读取数据如果数据没有就绪和真正的发生错误都是使用同样的方式进行标识如何进一步区分呢 -使用错误码errno来区别真正的出错和数据没有就绪的出错。
7区别真正的出错和数据没有就绪的出错
①当read函数以非阻塞方式读取标准输入时如果底层数据不就绪那么read函数就会立即返回但当底层数据不就绪时read函数是以出错的形式返回的此时的错误码会被设置为EAGAIN或EWOULDBLOCK。 ②因此在以非阻塞方式读取数据时如果调用read函数时得到的返回值是-1此时还需要通过错误码进一步进行判断如果错误码的值是EAGAIN或EWOULDBLOCK说明本次调用read函数出错是因为底层数据还没有就绪因此后续还应该继续调用read函数进行轮询检测数据是否就绪当数据继续时再进行数据的读取。 ③调用read函数在读取到数据之前可能会被其他信号中断此时read函数也会以出错的形式返回此时的错误码会被设置为EINTR此时应该重新执行read函数进行数据的读取。 ④因此在以非阻塞的方式读取数据时如果调用read函数读取到的返回值为-1此时并不应该直接认为read函数在底层读取数据时出错了而应该继续判断错误码如果错误码的值为EAGAIN、EWOULDBLOCK或EINTR则应该继续调用read函数再次进行读取。
8正确的非阻塞轮询的代码
int main()
{SetNonBlock(0);//将0号描述符设置为非阻塞状态while (true){char buffer[1024] {0};ssize_t s read(0, buffer, sizeof(buffer)-1);//少读取一个,是为了在最后的位置放\0if(s 0){buffer[s] 0;write(1,buffer,sizeof(buffer));printf(read success: s:%d errno:%d \n,s,errno);}else{if (errno EAGAIN || errno EWOULDBLOCK)//底层数据没有就绪{ printf(数据没有准备好,再试一下吧\n);printf(read failed: s:%d errno:%d \n,s,errno);sleep(1);continue;}else if (errno EINTR)//在读取数据之前被信号中断{ printf(数据被信号中断了,再试一下吧\n);sleep(1);continue;}else{printf(读取错误\n);break;}}}return 0;
}运行代码后当我们没有输入数据时程序就会不断调用read函数检测底层数据是否就绪。一旦我们进行了输入操作此时read函数就会在轮询检测时检测到紧接着立马将数据读取到从内核拷贝到我们传入的buffer数组当中并且调用write函数读取到的数据输出到显示器上面。
采用0号描述符进行操作是因为0号描述符必须要用户参与必须得有数据输入条件才能就绪。
6. IO事件就绪 IO事件就绪可以理解为两方面一是读事件就绪一是写事件就绪可以认为接收缓冲区有数据就是读事件就绪发送缓冲区里无数据就是写事件就绪。一旦等的事件就绪我们就可以进行拷贝/读取了。 但事实上频繁读写系统内核中的发送接收缓冲区会进行状态切换期间会进行一系列的处理工作会带来效率的下降所以一般接收缓冲区中设有高水位发送缓冲区中设有低水位的概念。 当超过水位才进行发送/通知上层读取。
7. I/O多路转接之Select
7.1 初识select
系统提供select函数来实现多路复用输入/输出模型
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的。程序会停在select这里等待直到被监视的文件描述符有一个或多个发生了状态改变。 7.2 Select函数原型
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);1参数解释
参数nfds是需要监视的最大的文件描述符值1rdset、wrset、exset分别对应于需要检测的可读文件描述符的集合可写文件描述符的集合及异常文件描述符的集合参数timeout为结构timeval用来设置select()的等待时间
2参数timeout的设置
NULL则表示select没有timeoutselect将一直被阻塞直到某个文件描述符上发生了事件;0仅检测描述符集合的状态然后立即返回并不等待外部事件的发生。特定的时间值如果在指定的时间段里没有事件发生select将超时返回。 struct timeval timeout {0, 0} //表示非阻塞
struct timeval timeout {5, 0} //5秒以内阻塞式超过5秒非阻塞返回一次 4fd_set类型解析
①在使用select函数时就免不了用到fd_set结构体。那fd_set就是怎么样的下面我们先看在man手册中关于select ②那么fd_set究竟是什么
typedef long int __fd_mask;/* Its easier to assume 8-bit bytes than to get CHAR_BIT. */
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
#define __FDELT(d) ((d) / __NFDBITS)
#define __FDMASK(d) ((__fd_mask) 1 ((d) % __NFDBITS))/* fd_set for select and pselect. */
typedef struct{/* XPG4.2 requires this member name. Otherwise avoid the namefrom the global namespace. */
#ifdef __USE_XOPEN__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)-fds_bits)
#else__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)-__fds_bits)
#endif} fd_set;/* Maximum number of file descriptors in fd_set. */
#define FD_SETSIZE __FD_SETSIZE //__FD_SETSIZE等于1024/* Access macros for fd_set. */
#define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp)
#define FD_CLR(fd, fdsetp) __FD_CLR (fd, fdsetp)
#define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp)
#define FD_ZERO(fdsetp) __FD_ZERO (fdsetp)③根据分析我么可以把这个结构理解为一个整数数组更严格的说是一个 “位图”。使用位图中对应的位来表示要监视的文件描述符。并且还提供了一组操作fd_set的接口函数来方便的操作该位图
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位5函数的返回值
执行成功则返回文件描述词状态已改变的个数。如果返回0代表在描述词状态改变前已超过timeout时间没有返回。当有错误发生时则返回-1错误原因存于errno此时参数readfdswritefds, exceptfds和timeout的值变成不可预测。
6其中错误值可能为
EBADF 文件描述词为无效的或该文件已关闭。EINTR 此调用被信号所中断。EINVAL 参数n 为负值。ENOMEM 核心内存不足。
7.3 理解Select的执行过程
理解select模型的关键在于理解fd_set,为说明方便取fd_set长度为1字节fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd。
执行fd_set set; FD_ZERO(set);则set用位表示是0000,0000。若fd5,执行FD_SET(fd,set);后set变为0001,0000(第5位置为1) 。若再加入fd2fd1,则set变为0001,0011 。执行select(6,set,0,0,0)阻塞等待 。若fd1,fd2上都发生可读事件则select返回此时set变为0000,0011。注意没有事件发生的fd5被清空。
7.4 socket就绪条件
7.4.1 读就绪
socket内核中接收缓冲区中的字节数大于等于低水位标记SO_RCVLOWAT。此时可以无阻塞的读该文件描述符并且返回值大于0。socket TCP通信中对端关闭连接此时对该socket读则返回0。监听的socket上有新的连接请求。socket上有未处理的错误。
7.4.2 写就绪
socket内核中发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小)大于等于低水位标记SO_SNDLOWAT此时可以无阻塞的写并且返回值大于0。socket的写操作被关闭(close或者shutdown)对一个写操作被关闭的socket进行写操作会触发SIGPIPE信号。socket使用非阻塞connect连接成功或失败之后。socket上有未读取的错误。
7.4.3 异常就绪
socket上收到带外数据。关于带外数据和TCP紧急模式相关(TCP协议头中有一个紧急指针的字段)。
7.5 Select的特点
可监控的文件描述符个数取决与sizeof(fd_set)的值。我这边服务器上sizeof(fd_set)512每bit表示一个文件描述符则我服务器上支持的最大文件描述符是512*84096。将fd加入select监控集的同时还要再使用一个数据结构array保存放到select监控集中的fd。
这里我们思考为什么需要额外的数据结构array保存放到select监控集中的fd
一方面是用于再select 返回后array作为源数据和fd_set进行FD_ISSET判断。另一方面是select返回后会把以前加入的但并无事件发生的fd清空则每次开始select前都要重新从array取得fd逐一加入(FD_ZERO最先)扫描array的同时取得fd最大值maxfd用于select的第一个参数。
也正是因为以上两点Select也有不得不说的缺点
7.6 Select缺点
每次调用select都需要手动设置fd集合从接口使用角度来说也非常不便。每次调用select都需要把fd集合从用户态拷贝到内核态这个开销在fd很多时会很大。同时每次调用select都需要在内核遍历传递进来的所有fd这个开销在fd很多时也很大。select支持的文件描述符数量太小。
7.7 Select使用示例
1SelectServer.hpp
#pragma once
#include iostream
#include sys/select.h
#include sys/time.h
#include Socket.hppstatic const uint16_t defaultport 8080;
static const int fd_num_max (sizeof(fd_set) * 8);
int defaultfd -1;class SelectServer
{
public:SelectServer(uint16_t port defaultport):_port(port){for(int i 0; i fd_num_max; i){fd_array[i] defaultfd;}}bool Init(){_listensock.Socket();_listensock.Bind(_port);_listensock.Listen();return true;}void Start(){int listensock _listensock.Getsock();fd_array[0] listensock;while(1){fd_set rfds;FD_ZERO(rfds); // 用来清除描述词组set的全部位int maxfd fd_array[0];for(int i 0; i fd_num_max; i){if(fd_array[i] defaultfd){continue;}FD_SET(fd_array[i], rfds); // 用来设置描述词组set中相关fd的位if(fd_array[i] maxfd){maxfd fd_array[i];LOG(NORMAL, max fd update);}}timeval timeout {3, 0};int n select(maxfd 1, rfds, nullptr, nullptr, timeout);switch (n){case 0:std::cout time out, timeout: timeout.tv_sec . timeout.tv_usec std::endl;break;case -1:std::cerr select error std::endl;break;default:// 有事件就绪了TODOstd::cout get a new link!!!!! std::endl;Dispatcher(rfds); // 就绪的事件和fd你怎么知道只有一个呢break;}}}void Dispatcher(fd_set rfds){for(int i 0; i fd_num_max; i){int fd fd_array[i];if (fd defaultfd){continue;}if(FD_ISSET(fd, rfds)) // 用来测试描述词组set中相关fd 的位是否为真{if (fd _listensock.Getsock()) //说明是监听套接字就绪{Accepter(); // 连接管理器}else // non listenfd{Recver(fd, i);}}}}void Accepter(){// 连接事件就绪了int sockfd _listensock.Accept();if(sockfd 0){return;}LOG(NORMAL, accept success);int pos 0;for(pos 1; pos fd_num_max; pos){if(fd_array[pos] ! defaultfd){continue;}else{break;}}if(pos fd_num_max){LOG(WARNING, server is full);close(sockfd);}else{fd_array[pos] sockfd;PrintFd();}}void Recver(int fd, int pos){char buffer[1024];int n read(fd, buffer, sizeof(buffer) - 1);if(n 0){buffer[n] 0;std::cout get a messge: buffer std::endl;}else if(n 0){LOG(NORMAL, client quit);close(fd);fd_array[pos] defaultfd; // 这里本质是从select中移除}else{LOG(WARNING, client quit);close(fd);fd_array[pos] defaultfd; // 这里本质是从select中移除}}void PrintFd(){std::cout online fd list: ;for (int i 0; i fd_num_max; i){if (fd_array[i] defaultfd){continue;}std::cout fd_array[i] ;}std::cout std::endl;}~SelectServer(){_listensock.Close();}private:Sock _listensock;uint16_t _port;int fd_array[fd_num_max]; // 数组, 用户维护的// int wfd_array[fd_num_max];
};2test.cpp
#include SelectServer.hpp
#include memoryint main()
{std::unique_ptrSelectServer svr(new SelectServer());svr-Init();svr-Start();return 0;
}8. I/O多路转接之Poll
8.1 Poll函数接口
#include poll.h
int poll(struct pollfd *fds, nfds_t nfds, int timeout);1参数说明
fds是一个poll函数监听的结构列表每一个元素中包含了三部分内容文件描述符、监听的事件集合、返回的事件集合。nfds表示fds数组的长度。timeout表示poll函数的超时时间单位是毫秒(ms)。
2pollfd结构体的定义
// pollfd结构
struct pollfd
{int fd; /* file descriptor */short events; /* requested events */short revents; /* returned events */
};fd特定的文件描述符若设置为负值则忽略events字段并且revents字段返回。events需要监视的文件描述符上的哪些事件。reventspoll 函数返回时告知用户该文件描述符上的哪些事件已经就绪。
3events 和 revents 的取值
事件描述是否可作为输入是否可作为输出POLLIN数据包括普通数据和优先数据可读是是POLLRDNORM普通数据可读是是POLLRDBAND优先级带数据可读Linux不支持是是POLLPRI高优先级数据可读比如TCP带外数据是是POLLOUT数据包括普通数据和优先数据可读是是POLLWRNORM普通数据可写是是POLLWRBAND优先级带数据可写是是POLLRDHUPTCP连接被对方关闭或者对方关闭了写操作它由GNU引入是是POLLERR错误否是POLLHUP挂起比如管道的写端被关闭后读端描述符上将收到POLLHUP事件否是POLLNVAL文件描述符没有打开否是
这些取值实际上都是以宏的方式进行定义的二进制序列当中只要一个比特位是1且1的位置各不相同 4返回结果
返回值小于0表示出错。返回值等于0表示poll函数等待超时。返回值大于0表示poll由于监听的文件描述符就绪而返回。
8.2 Poll的优点
struct pollfd 结构当中包含了 events 和 revents相当于select 函数的输入输出型参数进行分离不再适用 select 参数 - 值 传递的方式接口适用更简单poll 并没有最大数量限制但是数量过大后性能也会下降
8.3 Poll的缺点
poll 中监听的文件描述符数量太多时和 select函数一样poll 返回后需要轮询 pollfd 来获取就绪的描述符。每次调用 poll 都需要把大量的pollfd 结构从用户态拷贝到内核中。同时连接的大量客户端在一起时刻可能只有很少的处于就绪状态因此随着监视的描述符数量的增长其效率也会线性下降。
8.4 Poll使用示例
1PollServer.hpp
#pragma once
#include iostream
#include poll.h
#include sys/time.h
#include Socket.hppstatic const uint16_t defaultport 8080;
static const int fd_num_max (sizeof(fd_set) * 8);
int defaultfd -1;
int non_event 0;class PollServer
{
public:PollServer(uint16_t port defaultport):_port(port){for(int i 0; i fd_num_max; i){_event_fds[i].fd defaultfd;_event_fds[i].events non_event;_event_fds[i].revents non_event;}}bool Init(){_listensock.Socket();_listensock.Bind(_port);_listensock.Listen();return true;}void Start(){int listensock _listensock.Getsock();_event_fds[0].fd listensock;_event_fds[0].events POLLIN;int timeout 3000; // 3swhile(1){int n poll(_event_fds, fd_num_max, timeout);switch (n){case 0:std::cout time out, timeout: std::endl;break;case -1:std::cerr select error std::endl;break;default:// 有事件就绪了TODOstd::cout get a new link!!!!! std::endl;Dispatcher(); // 就绪的事件和fd你怎么知道只有一个呢break;}}}void Dispatcher(){for(int i 0; i fd_num_max; i){int fd _event_fds[i].fd;if (fd defaultfd){continue;}if(_event_fds[i].events POLLIN) // 用来测试描述词组set中相关fd 的位是否为真{if (fd _listensock.Getsock()) //说明是监听套接字就绪{Accepter(); // 连接管理器}else // non listenfd{Recver(fd, i);}}}}void Accepter(){// 连接事件就绪了int sockfd _listensock.Accept();if(sockfd 0){return;}LOG(NORMAL, accept success);int pos 0;for(pos 1; pos fd_num_max; pos){if(_event_fds[pos].fd ! defaultfd){continue;}else{break;}}if(pos fd_num_max){LOG(WARNING, server is full);close(sockfd);}else{_event_fds[pos].fd sockfd;_event_fds[pos].events POLLIN;_event_fds[pos].revents non_event;PrintFd();}}void Recver(int fd, int pos){char buffer[1024];int n read(fd, buffer, sizeof(buffer) - 1);if(n 0){buffer[n] 0;std::cout get a messge: buffer std::endl;}else if(n 0){LOG(NORMAL, client quit);close(fd);_event_fds[pos].fd defaultfd; // 这里本质是从select中移除}else{LOG(WARNING, client quit);close(fd);_event_fds[pos].fd defaultfd; // 这里本质是从select中移除}}void PrintFd(){std::cout online fd list: ;for (int i 0; i fd_num_max; i){if (_event_fds[i].fd defaultfd){continue;}std::cout _event_fds[i].fd ;}std::cout std::endl;}~PollServer(){_listensock.Close();}private:Sock _listensock;uint16_t _port;pollfd _event_fds[fd_num_max]; // 数组, 用户维护的// int wfd_array[fd_num_max];
};2test.cpp
#include PollServer.hpp
#include memoryint main()
{std::unique_ptrPollServer svr(new PollServer());svr-Init();svr-Start();return 0;
}9. I/O多路转接之epoll
9.1 epoll初识
按照man手册的说法是为处理大批量句柄而作了改进的poll。 它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)它几乎具备了之前所说的一切优点被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
9.2 epoll的相关系统调用
epoll 有3个相关的系统调用这一点和之前的poll是完全不一样的还有一点是epoll还是一样只负责等待。
9.2.1 epoll_create
#include sys/epoll.hint epoll_create(int size);创建一个epoll的句柄
自从linux2.6.8之后 size参数是被忽略的。用完之后必须调用close()关闭。
9.2.2 epoll_ctl
#include sys/epoll.hint epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);1epoll的事件注册函数
它不同于select()是在监听事件时告诉内核要监听什么类型的事件而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值(epoll的句柄)。第二个参数表示动作用三个宏来表示。第三个参数是需要监听的fd。第四个参数是告诉内核需要监听什么事。
2第二个参数的取值
EPOLL_CTL_ADD注册新的fd到epfd中EPOLL_CTL_MOD修改已经注册的fd的监听事件EPOLL_CTL_DEL从epfd中删除一个fd
3struct epoll_event 类型数据定义如下该类型包含两个成员其中一个为uint32_t类型成员events用于控制该事件的属性是可读、可写还是异常等。events可以为表示4.2中宏的集合还有一个为联合自定义类型数据其中该联合类型可以传四种不同的数据表达不同的意义但一般使用fd来指定文件描述符。
typedef union epoll_data
{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;struct epoll_event
{uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */
};4events可以是以下几个宏的集合
EPOLLIN表示对应的文件描述符可以读 (包括对端SOCKET正常关闭)。EPOLLOUT表示对应的文件描述符可以写。EPOLLPRI表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来)。EPOLLERR表示对应的文件描述符发生错误。EPOLLHUP表示对应的文件描述符被挂断。EPOLLET将EPOLL设为边缘触发(Edge Triggered)模式这是相对于水平触发(Level Triggered)来说的。EPOLLONESHOT只监听一次事件当监听完这次事件之后如果还需要继续监听这个socket的话需要再次把这个socket加入到EPOLL队列里。 9.2.3 epoll_wait
#include sys/epoll.hint epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);函数功能是收集在epoll监控的事件中已经发送的事件
参数events是分配好的epoll_event结构体数组。epoll将会把发生的事件赋值到events数组中 (events不可以是空指针内核只负责把数据复制到这个events数组中不会去帮助我们在用户态中分配内存)。maxevents告之内核这个events有多大这个 maxevents的值不能大于创建epoll_create()时的size。参数timeout是超时时间 (毫秒 0会立即返回 -1是永久阻塞)。如果函数调用成功返回对应I/O上已准备好的文件描述符数目如返回0表示已超时, 返回小于0表示函数失败。
9.3 epoll底层原理 1当某一进程调用epoll_create方法时 Linux内核会创建一个eventpoll结构体这个结构体中有两个成员与epoll的使用方式密切相关
struct eventpoll
{..../*红黑树的根节点这颗树中存储着所有添加到epoll中的需要监控的事件*/struct rb_root rbr;/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/struct list_head rdlist;....
};每一个epoll对象都有一个独立的eventpoll结构体用于存放通过epoll_ctl方法向epoll对象中添加进来的事件;这些事件都会挂载在红黑树中如此重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn其中n为树的高度);而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系也就是说当响应的事件发生时会调用这个回调方法;这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中;在epoll中对于每一个事件都会建立一个epitem结构体;
struct epitem
{struct rb_node rbn;//红黑树节点struct list_head rdllink;//双向链表节点struct epoll_filefd ffd; //事件句柄信息struct eventpoll *ep; //指向其所属的eventpoll对象struct epoll_event event; //期待发生的事件类型
}当调用epoll_wait检查是否有事件发生时只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空则把发生的事件复制到用户态同时将事件数量返回给用户这个操作的时间复杂度是O(1)。
2总结一下epoll的使用过程就是三部曲
调用epoll_create创建一个epoll句柄。调用epoll_ctl将要监控的文件描述符进行注册。调用epoll_wait等待文件描述符就绪。
9.4 epoll的优点
接口使用方便。虽然拆分成了三个函数但是反而使用起来更方便高效不需要每次循环都设置关注的文件描述符也做到了输入输出参数分离开。数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)。事件回调机制避免使用遍历而是使用回调函数的方式将就绪的文件描述符结构加入到就绪队列中。 epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪这个操作时间复杂度O(1)。即使文件描述符数目很多效率也不会受到影响。没有数量限制文件描述符数目无上限。
9.5 epoll工作方式
epoll有2种工作方式水平触发(LT)和边缘触发(ET)
你妈喊你吃饭的例子当你正在玩吃鸡的时候眼看进入了决赛圈你妈饭做好了喊你吃饭的时候有两种方式
如果你妈喊你一次你没动那么你妈会继续喊你第二次第三次…(亲妈水平触发)。如果你妈喊你一次你没动你妈就不管你了(后妈边缘触发)。
9.5.1 水平触发Level Triggered 工作模式
当epoll检测到socket上事件就绪的时候可以不立刻进行处理或者只处理一部分。由于只读了1K数据缓冲区中还剩1K数据在第二次调用 epoll_wait 时epoll_wait仍然会立刻返回并通知socket读事件就绪。直到缓冲区上所有的数据都被处理完epoll_wait 才不会立刻返回。支持阻塞读写和非阻塞读写。
9.5.2 边缘触发Edge Triggered工作模式
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式。
当epoll检测到socket上事件就绪时必须立刻处理。虽然只读了1K的数据缓冲区还剩1K的数据在第二次调用 epoll_wait 的时候epoll_wait 不会再返回了。也就是说ET模式下文件描述符上的事件就绪后只有一次处理机会。ET的性能比LT性能更高( epoll_wait 返回的次数少了很多)。Nginx默认采用ET模式使用epoll。只支持非阻塞的读写。
9.5.3 epoll的使用场景
epoll的高性能是有一定的特定场景的如果场景选择的不适宜epoll的性能可能适得其反。
对于多连接且多连接中只有一部分连接比较活跃时比较适合使用epoll。例如典型的一个需要处理上万个客户端的服务器例如各种互联网APP的入口服务器这样的服务器就很适合epoll。如果只是系统内部服务器和服务器之间进行通信只有少数的几个连接这种情况下用epoll就并不合适具体要根据需求和场景特点来决定使用哪种IO模型。
9.6 对比LT和ET
LT是 epoll 的默认行为。使用 ET 能够减少 epoll 触发的次数但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完。相当于一个文件描述符就绪之后不会反复被提示就绪看起来就比 LT 更高效一些但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理不让这个就绪被重复提示的话其实性能也是一样的。另一方面ET 的代码复杂程度更高了。
9.7 理解ET模式和非阻塞文件描述符
使用 ET 模式的 epoll需要将文件描述设置为非阻塞。这个不是接口上的要求 而是 “工程实践” 上的要求。
1假设这样的场景服务器接受到一个10k的请求会向客户端返回一个应答数据如果客户端收不到应答不会发送第二个10k请求。 2如果服务端写的代码是阻塞式的read并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来参考 man 手册的说明。可能被信号打断)剩下的9k数据就会待在缓冲区中。 此时由于 epoll 是ET模式并不会认为文件描述符读就绪epoll_wait 就不会再次返回剩下的 9k 数据会一直在缓冲区中直到下一次客户端再给服务器写数据epoll_wait 才能返回。但是问题来了
服务器只读到1k个数据要10k读完才会给客户端返回响应数据。客户端要读到服务器的响应。客户端发送了下一个请求epoll_wait 才会返回才能去读缓冲区中剩余的数据。 所以为了解决上述问题(阻塞read不一定能一下把完整的请求读完)于是就可以使用非阻塞轮训的方式来读缓冲区保证一定能把完整的请求都读出来。
而如果是LT没这个问题。只要缓冲区中的数据没读完epoll_wait 返回文件描述符读就绪。
9.8 epoll使用示例LT模式
1Poller.hpp
#pragma once
#include cerrno
#include cstring
#include sys/epoll.h
#include Log.hpp
#include unistd.hclass Poller
{
public:Poller(){_epfd epoll_create(128);if(_epfd -1){LOG(FATAL, epoll_create error);}else{LOG(NORMAL, epoll_create success);}}int EpllerUpdate(int oper, int sock, int event){int n 0;if(oper EPOLL_CTL_DEL){n epoll_ctl(_epfd, oper, sock, nullptr);if(n ! 0){LOG(FATAL, epoll_ctl delete error!);}}else{epoll_event ev;ev.events event;ev.data.fd sock;n epoll_ctl(_epfd, oper, sock, ev);if(n ! 0){LOG(FATAL, epoll_ctl error!);}}return n;}int EpollerWait(epoll_event revents[], int num){int n epoll_wait(_epfd, revents, num, _timeout);return n;}~Poller(){close(_epfd);}private:int _epfd;int _timeout{3000};
};2EpollServer.hpp
#pragma once
#include iostream
#include memory
#include sys/epoll.h
#include Socket.hpp
#include Poller.hpp
#include Log.hppclass EpollServer
{static const int num 64;public:EpollServer(uint16_t port): _port(port),_listsocket_ptr(new Sock()),_epoller_ptr(new Poller()){}void Init(){_listsocket_ptr-Socket();_listsocket_ptr-Bind(_port);_listsocket_ptr-Listen();LOG(NORMAL, create listen socket success);}void Start(){int listsockfd _listsocket_ptr-Getsock();// 将listsockfd添加到epoll中 - listensock和他关心的事件添加到内核epoll模型中rb_tree._epoller_ptr-EpllerUpdate(EPOLL_CTL_ADD, listsockfd, EPOLLIN);epoll_event revs[num];while(1){int n _epoller_ptr-EpollerWait(revs, num);if(n 0){// 有事件就绪LOG(DEBUG, event happened);Dispatcher(revs, n);}else if (n 0){LOG(NORMAL, time out ...);}else{LOG(FATAL, epll wait error);}}}void Dispatcher(epoll_event revs[], int num){for(int i 0; i num; i){uint32_t events revs[i].events;int fd revs[i].data.fd;if(events EPOLLIN){if(fd _listsocket_ptr-Getsock()){Accepter();}else{// 其他fd上面的普通读取事件就绪Recver(fd);}}else if(events EPOLLOUT){;}else{;}} }void Accepter(){ int sockfd _listsocket_ptr-Accept();if(sockfd 0){// 我们不能直接读取吗_epoller_ptr-EpllerUpdate(EPOLL_CTL_ADD, sockfd, EPOLLIN);LOG(NORMAL, get a new link);}}void Recver(int fd){char buffer[1024];ssize_t n read(fd, buffer, sizeof(buffer) - 1); if(n 0){buffer[n] 0;std::cout get a messge: buffer std::endl;// wrirtestd::string echo_str server echo $ ;echo_str buffer;write(fd, echo_str.c_str(), echo_str.size());}else if(n 0){LOG(NORMAL, client quit);//细节_epoller_ptr-EpllerUpdate(EPOLL_CTL_DEL, fd, 0);close(fd);}else{LOG(FATAL, recv error);_epoller_ptr-EpllerUpdate(EPOLL_CTL_DEL, fd, 0);close(fd);}}private:std::shared_ptrSock _listsocket_ptr;std::shared_ptrPoller _epoller_ptr;uint16_t _port;
};3test.cpp
#include iostream
#include memory
#include EpollServer.hppint main()
{ std::unique_ptrEpollServer svr(new EpollServer(8080));svr-Init();svr-Start();return 0;
}9.9 epoll使用示例ET模式
1基于 LT 版本稍加修改即可
修改TcpServer.hpp新增非阻塞读和非阻塞写接口。对于 accept 返回的sockfd加上 EPOLLET 这样的选项。
注意此代码考虑 listen_sock ET 的情况如果将 listen_sock 设为 ET则需要非阻塞轮询的方式 accept否则会导致同一时刻大量的客户端同时连接的时候只能 accept 一次的问题。
2由于ET模式的特性我们需要每次把数据一次性读完避免丢失这就需要每个描述符sock都有一个收发、错误、缓冲区并能执行相应的回调方法。 已经描述了就需要我们用unordered_map统一组织起来。(便于我们再回调函数中实现)
3对于读取操作
当buffer由不可读状态变为可读的时候即由空变为不空的时候。当有新数据到达时即buffer中的待读内容变多的时候。当buffer中有数据可读即buffer不空且用户对相应fd进行epoll_mod修改为epol_IN事件时。
4对于写操作
当buffer由不可写变为可写的时候即由满状态变为不满状态的时候。当有旧数据被发送走时即buffer中待写的内容变少得时候。当buffer中有可写空间即buffer不满且用户对相应fd进行epoll_mod修改为epol_OUT事件时
5TcpServer.hpp
#pragma once
#include iostream
#include string
#include memory
#include cerrno
#include functional
#include unordered_map
#include unistd.h
#include fcntl.h
#include Log.hpp
#include Poller.hpp
#include Socket.hppvoid SetNonBlockOrDie(int sock)
{int fl fcntl(sock, F_GETFL);if(fl 0){exit(-1);}fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}class Connection;
class TcpServer;uint32_t EVENT_IN (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT (EPOLLOUT | EPOLLET);
const static int g_buffer_size 128;using func_t std::functionvoid(std::weak_ptrConnection);class Connection
{
public:Connection(int sockfd):_sockfd(sockfd){}int Getsock(){return _sockfd;}void SetHander(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb recv_cb;_send_cb send_cb;_except_cb except_cb;}void AppendInBuffer(const std::string in){_in_buffer in;}void AppendOutBuffer(const std::string out){_out_buffer out;}std::string Inbuffer(){return _in_buffer;}std::string Outbuffer(){return _out_buffer;}void SetWeakPtr(TcpServer* tcp_server_ptr){_tcp_server_ptr tcp_server_ptr;}~Connection(){}public:func_t _recv_cb;func_t _send_cb;func_t _except_cb;// 添加一个回指指针TcpServer* _tcp_server_ptr;std::string _ip;uint16_t _port;private:int _sockfd;std::string _in_buffer;std::string _out_buffer;
};// enable_shared_from_this:可以提供返回当前对象的this对应的shared_ptr
class TcpServer : public std::enable_shared_from_thisTcpServer
{static const int num 64;public:TcpServer(uint16_t port, func_t OnMessage):_port(port),_OnMessage(OnMessage),_quit(true),_epoller_ptr(new (Poller)),_listensock_ptr(new (Sock)){}void Init(){_listensock_ptr-Socket();SetNonBlockOrDie(_listensock_ptr-Getsock());_listensock_ptr-Bind(_port);_listensock_ptr-Listen();LOG(NORMAL, create listen socket success\n);AddConnection(_listensock_ptr-Getsock(), EVENT_IN, std::bind(TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb, const std::string ip 0.0.0.0, uint16_t port 0){// 1. 给sock也建立一个connection对象将listensock添加到Connection中同时listensock和Connecion放入_connectionsstd::shared_ptrConnection new_connection(new Connection(sockfd));new_connection-SetWeakPtr(this); // shared_from_this(): 返回当前对象的shared_ptr//std::shared_ptrConnection new_connection std::make_sharedConnection(sockfd, std::shared_ptrTcpServer(this));new_connection-SetHander(recv_cb, send_cb, except_cb);new_connection-_ip ip;new_connection-_port port;//2. 添加到unordered_map_connections.insert(std::make_pair(sockfd, new_connection));// 3. 我们添加对应的事件除了要加到内核中fd event_epoller_ptr-EpllerUpdate(EPOLL_CTL_ADD, sockfd, event);}void Accepter(std::weak_ptrConnection conn){auto connection conn.lock();while(1){struct sockaddr_in peer;socklen_t len sizeof(peer);int sockfd accept(connection-Getsock(), (struct sockaddr*)peer, len);if(sockfd 0){uint16_t peerport ntohs(peer.sin_port);char ipbuf[128];inet_ntop(AF_INET, peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));printf(get a new client, get info- [%s:%d], sockfd : %d\n, ipbuf, peerport, sockfd);SetNonBlockOrDie(sockfd);// listensock只需要设置_recv_cb, 而其他sock读写异常AddConnection(sockfd, EVENT_IN,std::bind(TcpServer::Recver, this, std::placeholders::_1),std::bind(TcpServer::Sender, this, std::placeholders::_1),std::bind(TcpServer::Excepter, this, std::placeholders::_1),ipbuf, peerport);}else{if (errno EWOULDBLOCK){break;}else if(errno EINTR){continue;}else{break;}}}}void Recver(std::weak_ptrConnection conn){if(conn.expired()){return;}auto connection conn.lock();int sockfd connection-Getsock();while(1){char buffer[g_buffer_size];memset(buffer, 0, sizeof(buffer));ssize_t n recv(sockfd, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取if(n 0){connection-AppendInBuffer(buffer);}else if(n 0){printf(sockfd: %d, client info %s:%d quit...\n, sockfd, connection-_ip.c_str(), connection-_port);connection-_except_cb(connection);return;}else{if(errno EWOULDBLOCK){break;}else if(errno EINTR){continue;}else{printf(sockfd: %d, client info %s:%d recv error...\n, sockfd, connection-_ip.c_str(), connection-_port);connection-_except_cb(connection);return;}}}// 数据有了但是不一定全1. 检测 2. 如果有完整报文就处理_OnMessage(connection); // 你读到的sock所有的数据connection}void Sender(std::weak_ptrConnection conn){if(conn.expired()){return;}auto connection conn.lock();auto outbuffer connection-Outbuffer();while(1){ssize_t n send(connection-Getsock(), outbuffer.c_str(), outbuffer.size(), 0);if(n 0){outbuffer.erase(0, n);if(outbuffer.empty()){break;}}else if(n 0){return;}else{if(errno EWOULDBLOCK){break;}else if(errno EINTR){continue;}else{printf(sockfd: %d, client info %s:%d send error...\n, connection-Getsock(), connection-_ip.c_str(), connection-_port);connection-_except_cb(connection);return;}}}if(!outbuffer.empty()){// 开启对写事件的关心EnableEvent(connection-Getsock(), true, true);}else{// 关闭对写事件的关心EnableEvent(connection-Getsock(), true, false);}}void Excepter(std::weak_ptrConnection conn){if(conn.expired()){return;}auto connection conn.lock();int fd connection-Getsock();printf(Excepter hander sockfd: %d, client info %s:%d excepter handler\n,connection-Getsock(), connection-_ip.c_str(), connection-_port);// 1. 移除对特定fd的关心_epoller_ptr-EpllerUpdate(EPOLL_CTL_DEL, fd, 0);// 2. 关闭异常的文件描述符printf(close %d done...\n, fd);close(fd);// 3. 从unordered_map中移除printf(remove %d from _connections...\n, fd);_connections.erase(fd);}void EnableEvent(int sockfd, bool readable, bool writeable){uint32_t events 0;events | ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);_epoller_ptr-EpllerUpdate(EPOLL_CTL_MOD, sockfd, events);}bool IsConnectionSafe(int fd){auto iter _connections.find(fd);if(iter _connections.end()){return false;}return true;}void Start(){_quit false;while(1){Dispatcher();PrintConnection();}_quit true;}void Dispatcher(){int n _epoller_ptr-EpollerWait(revs, num);for(int i 0; i n; i){uint32_t events revs[i].events;int sockfd revs[i].data.fd;if((events EPOLLIN) IsConnectionSafe(sockfd)){if(_connections[sockfd]-_recv_cb){_connections[sockfd]-_recv_cb(_connections[sockfd]);}}if((events EPOLLOUT) IsConnectionSafe(sockfd)){if(_connections[sockfd]-_send_cb){_connections[sockfd]-_send_cb(_connections[sockfd]);}}}}void PrintConnection(){std::cout _connections fd list: ;for (auto connection : _connections){std::cout connection.second-Getsock() , ;std::cout inbuffer: connection.second-Inbuffer().c_str();}std::cout std::endl;}~TcpServer(){}private:std::shared_ptrPoller _epoller_ptr;std::shared_ptrSock _listensock_ptr; // 监听socket 可以把他移除到外部std::unordered_mapint, std::shared_ptrConnection _connections;struct epoll_event revs[num];uint16_t _port;bool _quit;// 让上层处理信息func_t _OnMessage;
};