海口市做网站的公司,网络热词2022,赣州网上房地产备案网,wordpress 描述插件在上一节利用管道实现了一个简单的聊天室#xff0c;但这个聊天室有一个很明显的问题就是#xff0c;当A处于读阻塞情况下是不能向B发送消息的#xff0c;只有收到B的消息才能发送。如何实现同时既能接受B的消息#xff0c;又能向其发送消息#xff1f;很遗憾#xff0c;…在上一节利用管道实现了一个简单的聊天室但这个聊天室有一个很明显的问题就是当A处于读阻塞情况下是不能向B发送消息的只有收到B的消息才能发送。如何实现同时既能接受B的消息又能向其发送消息很遗憾依靠基本的编程思维似乎无法解决这个问题。因为当A处于读阻塞状态时程序是不可能往下执行的。那么现在的聊天软件又是如何实现同时收发消息的这个时候我们就需要把问题交给OS帮我们来解决操作系统的内核通过控制底层操作帮助我们实现了一些逻辑上的”并行“也就是我们今天所说的 IO多路复用什么是IO多路复用IO多路复用I/O Multiplexing是一种同时监控多个文件描述符socket、管道、文件等的IO状态的机制。当其中任意一个或多个文件描述符就绪可读、可写或异常时内核会通知应用程序从而避免阻塞等待单个IO操作。在传统阻塞IO模型中每个IO操作如read/write会阻塞线程直到完成。若需处理多个连接如Web服务器必须为每个连接创建一个线程/进程导致资源浪费线程上下文切换、内存占用。
IO多路复用通过单线程监控多个IO事件实现高并发、低资源消耗。当聊天室使用了IO多路复用就可以同时监控读和写对应的文件描述符任何一个文件描述符就绪就会立即响应然后继续轮询等待从而实现了逻辑上的“并行”。核心机制与系统调用IO多路复用包含两种系统调用select与epoll。他们之间的实现方式是完全不同的select底层实现维护一个位图fd集合每次调用需遍历所有fd检查就绪状态轮询和通知由OS完成。位图机制通过一个固定大小的位图fd_set来管理文件描述符fd。每个 fd 占用一个位最大支持的 fd 数量通常为 1024。支持跨平台POSIX标准。线性扫描每次调用时内核会遍历所有 fd检查它们是否就绪。时间复杂度为 O(n)其中 n 是最大 fd 数。fd数量增加时性能下降。每次调用都需要重新传递 fd 集合调用时需要将用户态的 fd 集合拷贝到内核态内核处理后再次拷贝回用户态效率较低。使用流程需要使用的系统调用
#include sys/select.h
#include sys/time.h
//readset、writeset、exceptionset都是fd_set集合
//集合的相关操作如下
void FD_ZERO(fd_set *fdset); /* 将所有fd清零 */
void FD_SET(int fd, fd_set *fdset);/* 增加一个fd */
void FD_CLR(int fd, fd_set *fdset);/* 删除一个fd */
int FD_ISSET(int fd, fd_set *fdset);/* 判断一个fd是否有设置 */
int select(int maxfd, fd_set *readset,fd_set *writeset, fd_set *exceptionset,
struct timeval * timeout);这里先简要地介绍一下 select 使用流程首先需要先为监听集合申请内存使用 FD_ZERO 初始化监听集合将所有需要监听的文件描述符使用 FD_SET 加入监听集合调用 select 系统调用使进程陷入阻塞状态从阻塞当中被唤醒以后使用 FD_ISSET 遍历所有监听的文件描述符找到真正就绪的文件描述符对就绪的文件描述符执行IO操作。存在的问题每调用一次select 就需要3个事件类型的fd_set需从用户空间拷贝到内核空间去返回时select也会把保留了活跃事件的fd_set返回(从内核拷贝到用户空间)。当fd_set数据大的时候这个过程消耗是很大的。select需要逐个遍历fd_set集合 然后去检查对应fd的可读写状态如果fd_set 数据量多那么遍历fd_set 就是一个比较耗时的过程。fd_set是个集合类型的数据结构有长度限制,32位系统长度1024,64位系统长度2048这个就限制了select最多能同时监控1024个连接。系统调用FR_ZERO清空一个文件描述符集合。
void FD_ZERO(fd_set *fdset);fd_set readfds;
FD_ZERO(readfds); // 清空集合fdset指向 fd_set 类型的指针表示要清空的文件描述符集合。将 fd_set 中的所有位清零表示集合中没有任何文件描述符被设置。FD_SET将一个文件描述符加入到集合中。
void FD_SET(int fd, fd_set *fdset);fd_set readfds;
FD_ZERO(readfds); // 清空集合
FD_SET(sockfd1, readfds); // 将 sockfd1 加入集合
FD_SET(sockfd2, readfds); // 将 sockfd2 加入集合fd要加入集合的文件描述符。fdset指向 fd_set 类型的指针表示要操作的文件描述符集合。将指定的文件描述符 fd 设置为 1表示该文件描述符被加入到集合中。select监控多个文件描述符的可读、可写和异常状态。
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);nfds需要监控的最大文件描述符值加 1即 maxfd 1。OS会从0nfds的范围内的轮询从而减少不必要的监听nfds11024的位图被忽略readfds指向 fd_set 类型的指针表示要监控的可读文件描述符集合。writefds指向 fd_set 类型的指针表示要监控的可写文件描述符集合。exceptfds指向 fd_set 类型的指针表示要监控的异常状态文件描述符集合。timeout指向 struct timeval 类型的指针表示超时时间。如果为 NULL表示阻塞等待如果为 {0, 0}表示非阻塞。返回值大于 0表示就绪的文件描述符数量。等于 0表示超时没有任何文件描述符就绪。小于 0表示出错。select 函数会阻塞当前线程直到集合中的某个文件描述符就绪可读、可写或异常或超时。如果某个文件描述符就绪select 会返回就绪的文件描述符数量并修改对应的集合。示例
fd_set readfds;
struct timeval tv;FD_ZERO(readfds);
FD_SET(sockfd1, readfds);
FD_SET(sockfd2, readfds);tv.tv_sec 5; // 设置超时时间为 5 秒
tv.tv_usec 0;int ret select(sockfd2 1, readfds, NULL, NULL, tv);
if (ret 0) {if (FD_ISSET(sockfd1, readfds)) {printf(sockfd1 is ready for reading\n);}if (FD_ISSET(sockfd2, readfds)) {printf(sockfd2 is ready for reading\n);}
} else if (ret 0) {printf(Timeout occurred\n);
} else {printf(Error occurred\n);
}FD_ISSET检查一个文件描述符是否在集合中。
int FD_ISSET(int fd, const fd_set *fdset);fd要检查的文件描述符。fdset指向 fd_set 类型的指针表示要检查的文件描述符集合。非零表示文件描述符 fd 在集合中。零表示文件描述符 fd 不在集合中。检查指定的文件描述符 fd 是否被设置为 1即是否在集合中。FD_CLRFD_CLR 的主要功能是从一个文件描述符集合中移除一个指定的文件描述符。当某个文件描述符不再需要被监控时例如关闭了某个 socket。在 select 调用后需要清理某些不再需要的文件描述符。
void FD_CLR(int fd, fd_set *fdset);fd要从集合中移除的文件描述符。fdset指向 fd_set 类型的指针表示要操作的文件描述符集合。实战使用select对于基于管道的简易聊天程序进行改进基于上节我们通过管道实现的简易聊天程序我们对其进行改进实现同时收发消息
//客户端Aint main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf(waiting for connect\n);int fdr open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, open fdr error);int fdw open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, open fdw error);printf(connected\n);char buf[1024];fd_set rdset;while (1){FD_ZERO(rdset);FD_SET(STDIN_FILENO, rdset);FD_SET(fdr, rdset);select(fdr1, rdset, NULL, NULL, NULL);if(FD_ISSET(STDIN_FILENO, rdset)){memset(buf, 0, sizeof(buf));int ret read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, STDIN ERROR);write(fdw, buf, strlen(buf));}if(FD_ISSET(fdr, rdset)){memset(buf, 0, sizeof(buf));int ret read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, STDIN ERROR);if(ret 0){printf(B is disconnected\n);break;}printf(B:%s, buf);}}return 0;
}//客户端Bint main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf(waiting for connect\n);int fdw open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, open fdw error);int fdr open(argv[2], O_RDONLY);ERROR_CHECK(fdr, -1, open fdr error);printf(connected\n);char buf[1024];fd_set rdset;while (1){FD_ZERO(rdset);FD_SET(STDIN_FILENO, rdset);FD_SET(fdr, rdset);select(fdr1, rdset, NULL, NULL, NULL);if(FD_ISSET(STDIN_FILENO, rdset)){memset(buf, 0, sizeof(buf));int ret read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, STDIN ERROR);write(fdw, buf, strlen(buf));}if(FD_ISSET(fdr, rdset)){memset(buf, 0, sizeof(buf));int ret read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, STDIN ERROR);if(ret 0){printf(A is disconnected\n);break;}printf(A:%s, buf);}}return 0;
}输出结果
ubuntuubuntu:~/MyProject/Linux/IO$ ./selectA 1.pipe 2.pipe
waiting for connect
connected
hello!
B:who are you?
B:what are you doing?
I am A
I am eat dinner?
goodbye!
^Cubuntuubuntu:~/MyProject/Linux/IO$ ./selectB 1.pipe 2.pipe
waiting for connect
connected
A:hello!
who are you?
what are you doing?
A:I am A
A:I am eat dinner?
A:goodbye!
A is disconnected可以看到A和B可以同时收发消息无需等待收到消息之后再发送epollLinux特有在早期计算机网络并不发达所以并发网络请求并不会很高select模型也足够使用了但是随着网络的高速发展高并发的网络请求程序越来越多而select模式下 fd_set 长度限制就开始成为了致命的缺陷。下图显示了随着并发量的提升不同IO多路复用机制的响应速度。显然根据select的底层实现不难发现它有如下缺陷位图靠数组实现当改变长度需要重新编译每次从内核态读取就绪集合和重新将文件描述符放入集合会产生大量的内核态和用户态之间的冗余拷贝监听集合和就绪集合的耦合度高就绪集合的处理性能低吸取了select的教训epoll模式就不再使用数组的方式来保存自己所监控的fd信息了,epoll 可以在内核态空间当中维持两个数据结构监听事件集合和就绪事件队列。监听事件集合用来存储所有需要关注的设备即文件描述符和对应操作比如读、写、挂起和异常等等当监听的设备有事件产生时比如网卡上接收到了数据并传输到了缓冲区当中时硬件会采用中断等方式通知操作系统操作系统会将就绪事件拷贝到就绪事件队列中并且找到阻塞在 epoll_wait 的线程让其就绪。监听事件集合通常是一个红黑树就绪事件队列是一个线性表。底层实现红黑树 就绪链表使用红黑树管理所有注册的 fd当 fd 就绪时将其加入就绪链表。时间复杂度为 O(1)。边缘触发ET和水平触发LTET仅通知一次需一次性处理完所有数据减少事件触发次数高效但需非阻塞IO。LT默认模式fd就绪后若未处理完下次epoll_wait仍会通知。事件驱动内核维护一个事件表只返回已经就绪的 fd无需每次遍历所有 fd。无需重复传递 fd 集合通过 epoll_ctl 动态管理 fd无需在每次调用时重新传递 fd 集合。优势时间复杂度O(1)仅返回就绪的fd无需遍历。无fd数量限制理论上仅受系统内存限制。高效通过epoll_ctl注册/修改事件避免每次调用时重复传递fd集合。有了这些优势之后 epoll 逐渐取代了 select 的市场地位尤其是在管理巨大量连接的高
并发场景中 epoll 的性能要远超 select 。使用流程需要使用的系统调用
#includesys/epoll.h
int epoll_create(int size); //创建 epoll 实例
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); //注册文件描述符
//等待事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);//用于描述 epoll 就绪的事件及其关联数据。
struct epoll_event {uint32_t events; //表示文件描述符上发生的事件类型。EPOLLIN 表示读 EPOLLOUT 表示写epoll_data_t data; //存储与事件相关的数据具体类型由用户决定。
};//用于存储就绪事件中与事件相关的不同类型的数据。
typedef union epoll_data {void*ptr;int fd; //存储就绪事件对应的文件描述符uint32_t u32;uint64_t u64;
} epoll_data_t;
events 是一个 32 位的无符号整数用于表示文件描述符上发生的事件类型。它可以是一个或多个事件标志的组合通过位或操作。常见的事件类型包括
EPOLLIN表示文件描述符可读。
EPOLLOUT表示文件描述符可写。
EPOLLRDHUP表示对端关闭连接仅适用于 TCP 套接字。
EPOLLPRI表示有紧急数据可读。
EPOLLERR表示发生错误。
EPOLLHUP表示挂起文件描述符关闭。
EPOLLET表示边缘触发模式Edge-Triggered。
EPOLLONESHOT表示一次性事件事件处理完成后需要重新注册。创建 epoll 实例使用 epoll_create 或 epoll_create1 创建一个 epoll 文件描述符epfd。这个文件描述符用于后续的 epoll 操作。注册文件描述符使用 epoll_ctl 将需要监控的文件描述符如 socket注册到 epoll 实例中并指定感兴趣的事件如可读、可写。等待事件使用 epoll_wait 等待 epoll 实例中的事件。epoll_wait 会阻塞当前线程直到有文件描述符就绪或超时。处理事件当 epoll_wait 返回时它会返回就绪的文件描述符数量nfds并填充 events 数组。程序可以遍历 events 数组处理每个就绪的文件描述符。清理资源当不再需要 epoll 实例时可以关闭 epoll 文件描述符释放相关资源。 系统调用epoll_create 和 epoll_create1使用 epoll_create 或 epoll_create1 创建一个 epoll 文件描述符epfd。这个文件描述符用于后续的 epoll 操作。
int epoll_create(int size);
int epoll_create1(int flags);size建议的初始文件描述符数量一般选择1即可。flags可以设置一些标志如 EPOLL_CLOEXEC设置文件描述符为关闭执行。成功时返回一个有效的 epoll 文件描述符非负整数。失败时返回 -1并设置 errno 以指示错误原因。 epoll_ctl使用 epoll_ctl 将需要监控的文件描述符如 socket注册到 epoll 实例中并指定感兴趣的事件如可读、可写。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epfdepoll 实例的文件描述符。op操作类型可以是 EPOLL_CTL_ADD添加、EPOLL_CTL_MOD修改或 EPOLL_CTL_DEL删除。fd要操作的文件描述符。event指向 epoll_event 结构的指针包含要监控的事件和附加数据。成功时返回 0。失败时返回 -1具体错误码可以通过 errno 获取。epoll_wait使用 epoll_wait 等待 epoll 实例中的事件。epoll_wait 会阻塞当前线程直到有文件描述符就绪或超时。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);epfdepoll 实例的文件描述符。events指向 epoll_event 数组的指针用于存储就绪的事件。maxeventsevents 数组的最大容量等同于插入就绪的文件描述符数量。timeout超时时间单位为毫秒-1 表示阻塞等待0 表示非阻塞。返回值 0表示有就绪的文件描述符返回值为就绪的文件描述符数量。返回值 0表示超时没有任何文件描述符就绪。返回值 0表示发生错误具体错误码可以通过 errno 获取。示例
int main(int argc, char const *argv[])
{int epfd epoll_create1(0);ERROR_CHECK(epfd, -1, epoll_create)int sockfd socket(AF_INET, SOCK_STREAM, 0);ERROR_CHECK(sockfd, -1, socket);struct epoll_event ev;ev.events EPOLLIN; // 监听可读事件ev.data.fd sockfd; // 存储文件描述符int ret epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, ev);ERROR_CHECK(ret, -1, epoll_ctl: add);struct epoll_event events[10];int nfds epoll_wait(epfd, events, 10, -1);ERROR_CHECK(nfds, -1, epoll_wait);char buf[1024];for (int i 0; i nfds; i) {if (events[i].events EPOLLIN) {int fd events[i].data.fd; // 获取文件描述符// 处理可读事件read(fd, buf, sizeof(buf));}}return 0;
} 实战使用epoll对于基于管道的简易聊天程序进行改进
//客户A
//客户端A
#include54func.h
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf(waiting for connect\n);int fdr open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, open fdr error);int fdw open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, open fdw error);printf(connected\n);int epfd epoll_create(1);ERROR_CHECK(epfd, -1, epoll_create);struct epoll_event ev, evs[2];ev.events EPOLLIN; ev.data.fd STDIN_FILENO;int ret epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, ev);ERROR_CHECK(ret, -1, epoll_ctl: fdr);ev.data.fd fdr; ret epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, ev);ERROR_CHECK(ret, -1, epoll_ctl: fdw);char buf[1024];int readyNum 0;while (1){readyNum epoll_wait(epfd, evs, 2, -1);for(int i 0; i readyNum; i){if(evs[i].data.fd STDIN_FILENO){memset(buf, 0, sizeof(buf));read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}else if(evs[i].data.fd fdr){memset(buf, 0, sizeof(buf));int sret read(fdr, buf, sizeof(buf));if(sret 0){printf(B is disconnected\n);return 0;}printf(B:%s,buf);}}}return 0;
}
//客户B
//客户端B
#include54func.h
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf(waiting for connect\n);int fdw open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, open fdw error);int fdr open(argv[2], O_RDONLY);ERROR_CHECK(fdr, -1, open fdr error);printf(connected\n);int epfd epoll_create(1);ERROR_CHECK(epfd, -1, epoll_create);struct epoll_event ev, evs[2];ev.events EPOLLIN; ev.data.fd STDIN_FILENO;int ret epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, ev);ERROR_CHECK(ret, -1, epoll_ctl: fdr);ev.data.fd fdr; ret epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, ev);ERROR_CHECK(ret, -1, epoll_ctl: fdw);char buf[1024];int readyNum 0;while (1){readyNum epoll_wait(epfd, evs, 2, -1);for(int i 0; i readyNum; i){if(evs[i].data.fd STDIN_FILENO){memset(buf, 0, sizeof(buf));read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}else if(evs[i].data.fd fdr){memset(buf, 0, sizeof(buf));int sret read(fdr, buf, sizeof(buf));if(sret 0){printf(A is disconnected\n);return 0;}printf(A:%s,buf);}}}return 0;
}输出结果
//客户A
ubuntuubuntu:~/MyProject/Linux/IO$ ./epollA 1.pipe 2.pipe
waiting for connect
connected
hello I am A
who are you?
B:I am B
Am I alone?
B:You are not alone.
Someboday tell me, Why it feel more real when I dream than truth?
B:There is some fiction in your truth, and some truth in your fiction.
B is disconnected
//客户B
ubuntuubuntu:~/MyProject/Linux/IO$ ./epollB 1.pipe 2.pipe
waiting for connect
connected
A:hello I am A
A:who are you?
I am B
A:Am I alone?
You are not alone.
A:Someboday tell me, Why it feel more real when I dream than truth?
There is some fiction in your truth, and some truth in your fiction.
^Cepoll的边缘触发epoll_wait 的就绪触发有两种方式一种是默认的水平触发方式(Level-triggered)另一种是边缘触发模式(Edge-triggered)。以读事件为例子水平触发模式下只要缓冲区当中存在数据就可以使 epoll_wait 就绪在边缘触发的情况下如果缓冲区中存在数据但是数据一直没有增多那么 epoll_wait 就不会就绪只有缓冲区的数据增多的时候即下图中绿色的上升沿部分时才能使 epoll_wait 就绪。使用水平触发的话线程能够以更短的响应时间来处理事件但是这可能会导致饥饿问题如果存在某个事件传输的数据量过大那么线的epoll_wait就会多次就绪直到处理完所有数据为止而一些其他的任务所占用的资源就会相对变少而一直无法得到响应。使用边缘触发可以避免这个问题。为了确保读操作可以将所有数据读完可以考虑使用循环配合非阻塞的形式来处理。
在线程池架构中主线程通常会将实际的IO交给子线程即工作线程完成采用边缘触发可以有效地降低主线程的响应频率提高整体的性能。除此以外如果一次请求对应一次响应是用户追求的通信模式那么边缘触发正好符合。 设置文件描述符为非阻塞模式在边缘触发模式下文件描述符必须设置为非阻塞模式O_NONBLOCK否则可能会导致程序阻塞在 read 或 write 操作上。我们需要使用fcntl设置文件的状态
int flags fcntl(sockfd, F_GETFL, 0);
ERROR_CHECK(flags, -1, fcntl:get);int ret fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
ERROR_CHECK(ret, -1, fcntl: set);设置边缘触发模式在使用 epoll_ctl 注册文件描述符时通过在 events 字段中添加 EPOLLET 标志来启用边缘触发模式。
int epfd epoll_create(1);
ERROR_CHECK(epfd, -1, epoll_create);
struct epoll_event ev;
ev.events EPOLLIN | EPOLLET; // 启用边缘触发模式监听可读事件
ev.data.fd sockfd;
int ret epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, ev)
ERROR_CHECK(ret, -1, epoll_ctl: add)
处理边缘触发事件在边缘触发模式下必须确保在每次通知后处理完所有数据。否则如果数据没有被完全读取或写入可能会错过后续的数据。
struct epoll_event events[10];
int nfds epoll_wait(epfd, events, 10, -1);
ERROR_CHECK(nfds, -1, epoll_wait);for (int i 0; i nfds; i) {if (events[i].events EPOLLIN) {char buf[1024];// 使用非阻塞读取确保读取所有数据while (1) {memset(buf, 0, sizeof(buf));ssize_t ret read(sockfd, buf, sizeof(buf));ERROR_CHECK(ret, -1, read);if(ret -1 || ret 0){printf(finish\n);break;}printf(%s\n,buf);}}
}处理所有数据在边缘触发模式下必须确保在每次通知后处理完所有数据。如果数据没有被完全读取或写入可能会错过后续的数据。 一次性触发模式EPOLLONESHOT如果需要在处理完事件后自动禁用该文件描述符的事件通知可以结合 EPOLLONESHOT 标志使用。
ev.events EPOLLIN | EPOLLET | EPOLLONESHOT; // 启用边缘触发模式和一次性触发模式
ev.data.fd sockfd;int ret epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, ev);
ERROR_CHECK(ret, -1, epoll_ctl);使用水平触发和边缘触发的效果区别我们先看一下对文件描述符使用阻塞和非阻塞的效果设置边缘触发必须将文件描述符设置为非阻塞写端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);int fdw open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, open);char buf[256];while (1){read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}close(fdw);return 0;
} 读端为阻塞的情况
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);int fdr open(argv[1], O_RDONLY);char buf[20];while (1){memset(buf, 0, sizeof(buf));ssize_t sret read(fdr, buf, sizeof(buf));printf(%ld %s\n, sret, buf);if(sret 0) break;sleep(1);}close(fdr);return 0;
}
输出结果
//写端
ubuntuubuntu:~/MyProject/Linux/IO$ ./testETW 1.pipe
hello
what are you doing?
^C
//读端
ubuntuubuntu:~/MyProject/Linux/IO$ ./testET 1.pipe
6 hello
20 what are you doing?
0 可以看到当缓冲区没有数据时读端管道就会阻塞等待。只有当写端关闭时读端才会变为非阻塞状态当写端没有数据时读端收到的是0进而退出。 读端为非阻塞的情况
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);int fdr open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, open);int flags fcntl(fdr, F_GETFL, 0);ERROR_CHECK(flags, -1, fcntl:get);int ret fcntl(fdr, F_SETFL, flags | O_NONBLOCK); //设置为非阻塞ERROR_CHECK(ret, -1, fcntl:set);char buf[20];while (1){memset(buf, 0, sizeof(buf));ssize_t sret read(fdr, buf, sizeof(buf));printf(%ld %s\n, sret, buf);if(sret 0) break;sleep(1);}close(fdr);return 0;
}输出结果
//写端
ubuntuubuntu:~/MyProject/Linux/IO$ ./testETW 1.pipe
hello
who?
^C
//读端
ubuntuubuntu:~/MyProject/Linux/IO$ ./testET 1.pipe
-1
-1
6 hello-1
-1
-1
-1
6 who?0 把读端设置为非阻塞状态时不会因为没有数据而等待。而是不断去查询注意这里与阻塞状态下是有区别的当端没有收到数据时如果写端未断开读端会非阻塞的一直收到 sret -1 当写端断开时读端才会收到 sret 0。非阻塞模式允许程序在 I/O 操作无法完成时立即返回从而可以快速处理其他任务提高程序的响应速度。为什么在使用边缘触发时必须设置文件描述符为非阻塞模式这个问题我们先放一放先看水平触发和边缘触发的区别。现在我们继续观察使用水平触发和边缘触发读取数据的区别为了使结果便于辨别我们把接收缓冲区调小使用水平触发
//读端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);printf(waiting for connect\n);int fdr open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, open fdr error);printf(connected\n);int epfd epoll_create(1);ERROR_CHECK(epfd, -1, epoll_create);struct epoll_event ev, evs[2];ev.data.fd fdr; ev.events EPOLLIN ;int ret epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, ev);ERROR_CHECK(ret, -1, epoll_ctl: fdr);char buf[5];int readyNum 0;while (1){readyNum epoll_wait(epfd, evs, 2, -1);printf(epoll wait is ready\n);for(int i 0; i readyNum; i){if(evs[i].data.fd fdr){memset(buf, 0, sizeof(buf));int sret read(fdr, buf, sizeof(buf));if(sret -1){printf(B is disconnected\n);return 0;}printf(B:%s\n,buf);}}}return 0;
}//写端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf(waiting for connect\n);int fdw open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, open fdw error);printf(connected\n);int epfd epoll_create(1);ERROR_CHECK(epfd, -1, epoll_create);struct epoll_event ev, evs[2];ev.events EPOLLIN; ev.data.fd STDIN_FILENO;int ret epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, ev);ERROR_CHECK(ret, -1, epoll_ctl: fdw);char buf[5];int readyNum 0;while (1){readyNum epoll_wait(epfd, evs, 2, -1);for(int i 0; i readyNum; i){if(evs[i].data.fd STDIN_FILENO){memset(buf, 0, sizeof(buf));read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}}}return 0;
}输出结果
//客户端A
ubuntuubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello
epoll wait is ready
B:, I a
epoll wait is ready
B:m B,
epoll wait is ready
B:what
epoll wait is ready
B:is yo
epoll wait is ready
B:ur na
epoll wait is ready
B:me?
//客户端B
ubuntuubuntu:~/MyProject/Linux/IO$ ./ETB 1.pipe 2.pipe
waiting for connect
connected
hello, I am B, what is your name?可以观察到在使用水平触发时我们并未一次性读取全部数据而是部分读取只要读端有数据 epoll 就会多次就绪直到把数据全部取出。现在我们使用边缘触发看看效果
//读端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);printf(waiting for connect\n);int fdr open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, open fdr error);int flags fcntl(fdr, F_GETFL, 0);ERROR_CHECK(flags, -1, fcntl:get);int ret fcntl(fdr, F_SETFL, flags | O_NONBLOCK);ERROR_CHECK(ret, -1, fcntl:set);printf(connected\n);int epfd epoll_create(1);ERROR_CHECK(epfd, -1, epoll_create);struct epoll_event ev, evs[2];ev.data.fd fdr; ev.events EPOLLIN | EPOLLET;ret epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, ev);ERROR_CHECK(ret, -1, epoll_ctl: fdr);char buf[5];int readyNum 0;while (1){readyNum epoll_wait(epfd, evs, 2, -1);printf(epoll wait is ready\n);for(int i 0; i readyNum; i){if(evs[i].data.fd fdr){memset(buf, 0, sizeof(buf));int sret read(fdr, buf, sizeof(buf));if(sret 0){printf(B is disconnected\n);return 0;}printf(B:%s\n,buf);}}}return 0;
}
输出结果
//读端
ubuntuubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello
epoll wait is ready
B:, I a
//写端
ubuntuubuntu:~/MyProject/Linux/IO$ ./ETB1 1.pipe
waiting for connect
connected
hello, I am B, what is your name?
can you hear me? 可以看到使用边缘触发时无论读端是否有数据epoll 只会在每次收到数据时就绪一次。即使端没有一次性读取全部数据也要等待下一次收到数据时才能再读取数据。那么如何在缓冲区一次无法接收全部数据时进行多次读取呢这个时候我们可以使用循环效果如下
while (1){readyNum epoll_wait(epfd, evs, 2, -1);printf(epoll wait is ready\n);for(int i 0; i readyNum; i){if(evs[i].data.fd fdr){while(1){memset(buf, 0, sizeof(buf));int sret read(fdr, buf, sizeof(buf));if(sret 0 || sret -1){printf(finish\n);break;}printf(B:%s\n,buf);}}}}输出结果
ubuntuubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello
B:, I a
B:m B,
B:what
B:is yo
B:ur na
B:me?finish这样我们就可以在边缘触发下一次性读取全部数据。以下是我在使用边缘触发读取数据的几点疑问为什么使用边缘触发时要把文件描述符设置为非阻塞的我们把文件描述符设置为阻塞模式看看效果
int fdr open(argv[1], O_RDONLY);
ERROR_CHECK(fdr, -1, open fdr error);
int flags fcntl(fdr, F_GETFL, 0);
ERROR_CHECK(flags, -1, fcntl:get);
//int ret fcntl(fdr, F_SETFL, flags | O_NONBLOCK);
//ERROR_CHECK(ret, -1, fcntl:set);
printf(connected\n); 输出结果
//读端
ubuntuubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello, I am B, what
B:is your name?B:hello, I am B, what
B:is your name?
//写端
ubuntuubuntu:~/MyProject/Linux/IO$ ./ETB1 1.pipe
waiting for connect
connected
hello, I am B, what is your name?
hello, I am B, what is your name?可以看到在发送两次消息时epoll只就绪了一次这明显是有问题的 原因如下当 read() 读取的数据量少于缓冲区大小时程序无法区分是数据已读完需等待新事件还是数据未读完需继续读。若此时继续调用阻塞的 read()它会一直等待新数据到来导致线程阻塞。这导致其他文件描述符的事件得不到处理饥饿且当前描述符的后续事件可能丢失因为状态未再次变化。ET 模式要求程序在收到事件后必须一次性处理完所有数据直到返回 EAGAIN。非阻塞模式的 read()/write() 在数据不足时会立即返回 EAGAIN 或 EWOULDBLOCK程序可据此安全停止读取。阻塞模式下若最后一次 read() 时内核缓冲区数据恰好读完调用会阻塞线程直到新数据到来。非阻塞模式确保 read() 总是立即返回避免线程意外阻塞。结论ET 必须非阻塞LT 可容忍阻塞但仍推荐非阻塞。什么时候时候可以用ET什么时候可以用LT从之前的输出结果我们可以看到二者一个很明显的区别就是使用ET时可以极大的减少通知次数减少 epoll_wait() 的返回次数仅在状态变化时触发降低系统调用开销。在有成千上万连接的高并发情况下减少通知次数可以有效缓解服务器处理请求的压力。把后续处理任务交给线程自行处理能够更好的响应其他的连接而不是一直把精力耗费在单个连接上。由此我们便很容易发现二者之间的区别和优势边缘触发ET的优势更高的性能潜力 减少 epoll_wait() 的返回次数仅在状态变化时触发降低系统调用开销。 适合高并发场景如 10k 连接能显著减少 CPU 占用。避免重复事件风暴 对高频事件如套接字持续可写更友好不会因状态未变化而重复通知。更精细的控制强制要求程序一次性处理所有数据避免逻辑分散。水平触发LT的优势编程简单可靠 允许分批处理数据例如一次 read() 部分数据未处理完的事件会持续触发。 不易遗漏事件适合快速开发。行为可预测 与传统 select/poll 行为一致迁移成本低。 对异常情况如未处理完数据更宽容。资源友好 适合低频或突发流量场景如 HTTP 短连接不会因单次未处理完而卡死。其次epoll出现的时间较晚它生而就是为高并发而生的。最初只支持边缘触发算是一个历史遗留问题所以在使用epoll时使用边缘触发更常见。何时更适合使用边缘触发ET高性能服务器 需要处理 10k 并发连接如游戏服务器、交易所系统。 例如WebSocket 长连接服务ET 能减少可写事件的重复通知。需避免事件风暴的场景 监听大量持续可写的套接字如日志广播服务LT 会频繁通知而 ET 仅在缓冲区从满变为非满时通知一次。精细控制数据吞吐 需要最大化单次 I/O 效率的场景如文件传输服务配合非阻塞 I/O 一次性读写完整数据块。延迟敏感型应用 金融交易系统等低延迟场景ET 减少内核到用户态的事件传递次数。何时更适合水平触发LT开发效率优先的应用 原型开发、内部工具等LT 的简单性可降低调试成本。低频 I/O 场景 命令行工具、低频数据采集服务如传感器上报。需要兼容旧代码 从 select/poll 迁移到 epoll 时LT 行为一致兼容性更好。对吞吐要求不极端 普通 Web 服务器如 Nginx 默认使用 ET但 Apache 可选 LT。超时处理超时机制是IO多路复用中的一个重要功能它允许程序在等待IO事件时设置一个时间限制防止程序无限期地阻塞。实际应用中程序可能需要在等待IO事件的同时执行其他任务或者在超时后采取某种默认行为。在网络编程中客户端可能需要在一定时间内等待服务器响应超时后重试或断开连接。在多任务环境中程序可能需要在等待IO事件的同时处理其他任务。一个典型的例子就是游戏中的挂机党当游戏服务器中存在大量的挂机玩家会严重占用资源也会影响其他正常玩家的游戏体验。使用超时机制可以在规定时间内清除无响应的玩家使资源平衡到其他正常玩家中。在设置IO多路复用的超时机制时需要传入一个时间结构体用于设置超时时间可以精确到微秒级别
struct timeval {time_t tv_sec; // 秒数suseconds_t tv_usec; // 微秒数1秒 1,000,000 微秒
};selcet示例
#include54func.hint main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf(waiting for connect\n);int fdr open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, open fdr error);int fdw open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, open fdw error);printf(connected\n);char buf[1024];fd_set rdset;while (1){FD_ZERO(rdset);FD_SET(STDIN_FILENO, rdset);FD_SET(fdr, rdset);struct timeval timeout;timeout.tv_sec 3;timeout.tv_usec 0;int ret select(fdr1, rdset, NULL, NULL, timeout);if(ret 0){printf(timeout! disconnect\n);break;}if(FD_ISSET(STDIN_FILENO, rdset)){memset(buf, 0, sizeof(buf));int ret read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, STDIN ERROR);write(fdw, buf, strlen(buf));}if(FD_ISSET(fdr, rdset)){memset(buf, 0, sizeof(buf));int ret read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, STDIN ERROR);if(ret 0){printf(B is disconnected\n);break;}printf(B:%s, buf);}}return 0;
}ubuntuubuntu:~/MyProject/Linux/IO$ ./timeoutA 1.pipe
waiting for connect
connected
timeout! disconnect对于有多个用户同时进行连接时仅向select里面放一个timeout因为每收到一个消息计时器就会重新计时无法做到超时踢出的效果。这个时候我们可以改变一下思路可以设置一个本地每秒钟都会响应的计时器并存储上一次活动的时间。每当计时器响应时就检查当前时间与上一次的差值是否超过规定时间超过就会自动下线。代码实现如下
#include54func.hint main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf(waiting for connect\n);int fdr open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, open fdr error);int fdw open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, open fdw error);printf(connected\n);char buf[1024];fd_set rdset;time_t curtime time(NULL);time_t lastactive time(NULL);while (1){FD_ZERO(rdset);FD_SET(STDIN_FILENO, rdset);FD_SET(fdr, rdset);struct timeval timeout;timeout.tv_sec 1;timeout.tv_usec 0;int ret select(fdr1, rdset, NULL, NULL, timeout);curtime time(NULL);if(FD_ISSET(STDIN_FILENO, rdset)){memset(buf, 0, sizeof(buf));int ret read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, STDIN ERROR);write(fdw, buf, strlen(buf));lastactive time(NULL);}if(FD_ISSET(fdr, rdset)){memset(buf, 0, sizeof(buf));int ret read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, STDIN ERROR);if(ret 0){printf(B is disconnected\n);break;}printf(B:%s, buf);}if(curtime - lastactive 3){printf(timeout! disconnect\n);break;}}return 0;
}